Skip to content

Commit

Permalink
Extract as a function
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Oct 30, 2024
1 parent 06f29be commit 12b1a97
Showing 1 changed file with 130 additions and 125 deletions.
255 changes: 130 additions & 125 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,95 +470,102 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo(
return Make(schema_, num_rows(), std::move(copied_columns));
}

namespace {
struct EnumeratedStatistics {
int nth_statistics = 0;
bool start_new_column = false;
std::optional<int32_t> nth_column = std::nullopt;
const char* key = nullptr;
std::shared_ptr<DataType> type = nullptr;
ArrayStatistics::ValueType value = false;
};
using OnStatistics =
std::function<Status(const EnumeratedStatistics& enumerated_statistics)>;
Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) {
EnumeratedStatistics statistics;
statistics.nth_statistics = 0;
statistics.start_new_column = true;
statistics.nth_column = std::nullopt;
statistics.key = ARROW_STATISTICS_KEY_ROW_COUNT_EXACT;
statistics.type = int64();
statistics.value = record_batch.num_rows();
RETURN_NOT_OK(on_statistics(statistics));

int num_fields = record_batch.schema()->num_fields();
for (int nth_column = 0; nth_column < num_fields; ++nth_column) {
auto column_statistics = record_batch.column(nth_column)->statistics();
if (!column_statistics) {
continue;
}

statistics.start_new_column = true;
statistics.nth_column = nth_column;
if (column_statistics->null_count.has_value()) {
statistics.nth_statistics++;
statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_EXACT;
statistics.type = int64();
statistics.value = column_statistics->null_count.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->distinct_count.has_value()) {
statistics.nth_statistics++;
statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT;
statistics.type = int64();
statistics.value = column_statistics->distinct_count.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->min.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_min_exact) {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MinArrowType();
statistics.value = column_statistics->min.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->max.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_max_exact) {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MaxArrowType();
statistics.value = column_statistics->max.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
}
return Status::OK();
}
} // namespace

Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
MemoryPool* memory_pool) const {
auto enumerate_statistics =
[&](std::function<Status(int nth_statistics, bool start_new_column,
std::optional<int32_t> nth_column, const char* key,
const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value)>
yield) {
int nth_statistics = 0;
RETURN_NOT_OK(yield(nth_statistics++, true, std::nullopt,
ARROW_STATISTICS_KEY_ROW_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{num_rows_}));

int num_fields = schema_->num_fields();
for (int nth_column = 0; nth_column < num_fields; ++nth_column) {
auto statistics = column(nth_column)->statistics();
if (!statistics) {
continue;
}

bool start_new_column = true;
if (statistics->null_count.has_value()) {
RETURN_NOT_OK(yield(
nth_statistics++, start_new_column, std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{statistics->null_count.value()}));
start_new_column = false;
}

if (statistics->distinct_count.has_value()) {
RETURN_NOT_OK(yield(
nth_statistics++, start_new_column, std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{statistics->distinct_count.value()}));
start_new_column = false;
}

if (statistics->min.has_value()) {
if (statistics->is_min_exact) {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MIN_VALUE_EXACT,
statistics->MinArrowType(), statistics->min.value()));
} else {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE,
statistics->MinArrowType(), statistics->min.value()));
}
start_new_column = false;
}

if (statistics->max.has_value()) {
if (statistics->is_max_exact) {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MAX_VALUE_EXACT,
statistics->MaxArrowType(), statistics->max.value()));
} else {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE,
statistics->MaxArrowType(), statistics->max.value()));
}
start_new_column = false;
}
}
return Status::OK();
};

std::vector<std::shared_ptr<Field>> values_types;
std::vector<int8_t> values_type_indexes;
RETURN_NOT_OK(enumerate_statistics(
[&](int nth_statistics, bool start_new_column, std::optional<int32_t> nth_column,
const char* key, const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value) {
int8_t i = 0;
for (const auto& field : values_types) {
if (field->type()->id() == type->id()) {
break;
}
i++;
}
if (i == static_cast<int8_t>(values_types.size())) {
values_types.push_back(field(type->name(), type));
}
values_type_indexes.push_back(i);
return Status::OK();
}));
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
int8_t i = 0;
for (const auto& field : values_types) {
if (field->type()->id() == statistics.type->id()) {
break;
}
i++;
}
if (i == static_cast<int8_t>(values_types.size())) {
values_types.push_back(field(statistics.type->name(), statistics.type));
}
values_type_indexes.push_back(i);
return Status::OK();
}));

auto keys_type = dictionary(int32(), utf8(), false);
auto values_type = dense_union(values_types);
Expand All @@ -584,46 +591,44 @@ Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(values_builder));
StructBuilder builder(statistics_type, memory_pool, std::move(field_builders));

RETURN_NOT_OK(enumerate_statistics(
[&](int nth_statistics, bool start_new_column, std::optional<int32_t> nth_column,
const char* key, const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value) {
if (start_new_column) {
RETURN_NOT_OK(builder.Append());
if (nth_column.has_value()) {
RETURN_NOT_OK(columns_builder->Append(nth_column.value()));
} else {
RETURN_NOT_OK(columns_builder->AppendNull());
}
RETURN_NOT_OK(values_builder->Append());
}
RETURN_NOT_OK(keys_builder->Append(key, static_cast<int32_t>(strlen(key))));
const auto values_type_index = values_type_indexes[nth_statistics];
RETURN_NOT_OK(items_builder->Append(values_type_index));
struct Visitor {
ArrayBuilder* builder;

Status operator()(const bool& value) {
return static_cast<BooleanBuilder*>(builder)->Append(value);
}
Status operator()(const int64_t& value) {
return static_cast<Int64Builder*>(builder)->Append(value);
}
Status operator()(const uint64_t& value) {
return static_cast<UInt64Builder*>(builder)->Append(value);
}
Status operator()(const double& value) {
return static_cast<DoubleBuilder*>(builder)->Append(value);
}
Status operator()(const std::string& value) {
return static_cast<StringBuilder*>(builder)->Append(
value.data(), static_cast<int32_t>(value.size()));
}
} visitor;
visitor.builder = values_builders[values_type_index].get();
RETURN_NOT_OK(std::visit(visitor, value));
return Status::OK();
}));
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
if (statistics.start_new_column) {
RETURN_NOT_OK(builder.Append());
if (statistics.nth_column.has_value()) {
RETURN_NOT_OK(columns_builder->Append(statistics.nth_column.value()));
} else {
RETURN_NOT_OK(columns_builder->AppendNull());
}
RETURN_NOT_OK(values_builder->Append());
}
RETURN_NOT_OK(keys_builder->Append(statistics.key,
static_cast<int32_t>(strlen(statistics.key))));
const auto values_type_index = values_type_indexes[statistics.nth_statistics];
RETURN_NOT_OK(items_builder->Append(values_type_index));
struct Visitor {
ArrayBuilder* builder;

Status operator()(const bool& value) {
return static_cast<BooleanBuilder*>(builder)->Append(value);
}
Status operator()(const int64_t& value) {
return static_cast<Int64Builder*>(builder)->Append(value);
}
Status operator()(const uint64_t& value) {
return static_cast<UInt64Builder*>(builder)->Append(value);
}
Status operator()(const double& value) {
return static_cast<DoubleBuilder*>(builder)->Append(value);
}
Status operator()(const std::string& value) {
return static_cast<StringBuilder*>(builder)->Append(
value.data(), static_cast<int32_t>(value.size()));
}
} visitor;
visitor.builder = values_builders[values_type_index].get();
RETURN_NOT_OK(std::visit(visitor, statistics.value));
return Status::OK();
}));

return builder.Finish();
}
Expand Down

0 comments on commit 12b1a97

Please sign in to comment.