Skip to content

Commit

Permalink
GH-36892: [C++] Fix performance regressions in FieldPath::Get (#37032)
Browse files Browse the repository at this point in the history
### Rationale for this change

#35197 appears to have introduced significant performance regressions in `FieldPath::Get` - indicated [here](https://conbench.ursa.dev/compare/runs/9cf73ac83f0a44179e6538b2c1c7babd...3d76cb5ffb8849bf8c3ea9b32d08b3b7/), in a benchmark that uses a wide (10K column) dataframe.

### What changes are included in this PR?

- Adds basic benchmarks for `FieldPath::Get` across various input types, as they didn't previously exist
- Addresses several performance issues. These came in the form of extremely high upfront costs for the `RecordBatch` and `ArrayData` overloads specifically
- Some minor refactoring of `NestedSelector`

### Are these changes tested?

Yes (covered by existing tests)

### Are there any user-facing changes?

No

* Closes: #36892

Lead-authored-by: benibus <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
benibus and pitrou authored Aug 8, 2023
1 parent c6c21c0 commit 59f30f0
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 36 deletions.
102 changes: 66 additions & 36 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,17 +1066,29 @@ std::string FieldPath::ToString() const {
return repr;
}

static Status NonStructError() {
return Status::NotImplemented("Get child data of non-struct array");
}
struct NestedSelectorUtil {
static Status NonStructError() {
return Status::NotImplemented("Get child data of non-struct array");
}

template <typename T>
static const DataType* GetType(const T& input) {
if constexpr (std::is_same_v<T, ArrayData>) {
return input.type.get();
} else {
return input.type().get();
}
}
};

// Utility class for retrieving a child field/column from a top-level Field, Array, or
// ChunkedArray. The "root" value can either be a single parent or a vector of its
// children.
// Utility class for retrieving a child field/column from a top-level Field, Array,
// ArrayData, or ChunkedArray. The "root" value can either be a single parent or a vector
// of its children.
template <typename T, bool IsFlattening = false>
class NestedSelector {
public:
using ArrowType = T;
using Util = NestedSelectorUtil;

explicit NestedSelector(const std::vector<std::shared_ptr<T>>& children)
: parent_or_children_(&children) {}
Expand All @@ -1095,7 +1107,18 @@ class NestedSelector {
Result<NestedSelector> GetChild(int i) const {
std::shared_ptr<T> child;
if (auto parent = get_parent()) {
ARROW_ASSIGN_OR_RAISE(child, GetChild(*parent, i, pool_));
const DataType* type = Util::GetType(*parent);
// We avoid this check for schema fields since it's inconsequential (plus there are
// tests elsewhere that rely on it not happening)
if constexpr (!std::is_same_v<T, Field>) {
if (ARROW_PREDICT_FALSE(type->id() != Type::STRUCT)) {
return Util::NonStructError();
}
}
// Bounds-check the index *once* using the parent's type
if (ARROW_PREDICT_TRUE(i >= 0 && i < type->num_fields())) {
ARROW_ASSIGN_OR_RAISE(child, GetChild(*parent, i, pool_));
}
} else if (auto children = get_children()) {
if (ARROW_PREDICT_TRUE(i >= 0 && static_cast<size_t>(i) < children->size())) {
child = (*children)[i];
Expand Down Expand Up @@ -1129,10 +1152,10 @@ class NestedSelector {
*os << "column types: { ";
if (auto children = get_children()) {
for (const auto& child : *children) {
*os << *child->type() << ", ";
*os << *Util::GetType(*child) << ", ";
}
} else if (auto parent = get_parent()) {
for (const auto& field : parent->type()->fields()) {
for (const auto& field : Util::GetType(*parent)->fields()) {
*os << *field->type() << ", ";
}
}
Expand All @@ -1155,21 +1178,33 @@ class NestedSelector {
}

static Result<std::shared_ptr<Field>> GetChild(const Field& field, int i, MemoryPool*) {
if (ARROW_PREDICT_FALSE(i < 0 || i >= field.type()->num_fields())) {
return nullptr;
}
return field.type()->field(i);
}

static Result<std::shared_ptr<Array>> GetChild(const Array& array, int i,
MemoryPool* pool) {
if (ARROW_PREDICT_FALSE(array.type_id() != Type::STRUCT)) {
return NonStructError();
}
if (ARROW_PREDICT_FALSE(i < 0 || i >= array.num_fields())) {
return nullptr;
static Result<std::shared_ptr<ArrayData>> GetChild(const ArrayData& data, int i,
MemoryPool* pool) {
std::shared_ptr<ArrayData> child_data;
if constexpr (IsFlattening) {
// First, convert to an Array so we can use StructArray::GetFlattenedField
auto array = MakeArray(data.Copy());
ARROW_ASSIGN_OR_RAISE(auto child_array, GetChild(*array, i, pool));
child_data = child_array->data();
} else {
// We could achieve the same result by converting to an Array (via MakeArray),
// calling StructArray::field(i), and pulling out the new ArrayData. However, this
// process can be very expensive when there are many columns - so we just
// reimplement the functionality that we need
child_data = data.child_data[i];
if (data.offset != 0 || data.child_data[i]->length != data.length) {
child_data = child_data->Slice(data.offset, data.length);
}
}

return std::move(child_data);
}

static Result<std::shared_ptr<Array>> GetChild(const Array& array, int i,
MemoryPool* pool) {
const auto& struct_array = checked_cast<const StructArray&>(array);
if constexpr (IsFlattening) {
return struct_array.GetFlattenedField(i, pool);
Expand All @@ -1181,22 +1216,15 @@ class NestedSelector {
static Result<std::shared_ptr<ChunkedArray>> GetChild(const ChunkedArray& chunked_array,
int i, MemoryPool* pool) {
const auto& type = *chunked_array.type();
if (ARROW_PREDICT_FALSE(type.id() != Type::STRUCT)) {
return NonStructError();
}
if (ARROW_PREDICT_FALSE(i < 0 || i >= type.num_fields())) {
return nullptr;
}

ArrayVector chunks;
chunks.reserve(chunked_array.num_chunks());
for (const auto& parent_chunk : chunked_array.chunks()) {
ARROW_ASSIGN_OR_RAISE(auto chunk, GetChild(*parent_chunk, i, pool));
if (!chunk) return nullptr;
chunks.push_back(std::move(chunk));
}

return ChunkedArray::Make(std::move(chunks), type.field(i)->type());
return std::make_shared<ChunkedArray>(std::move(chunks), type.field(i)->type());
}

std::shared_ptr<T> owned_parent_;
Expand Down Expand Up @@ -1289,7 +1317,11 @@ Result<std::shared_ptr<Schema>> FieldPath::GetAll(const Schema& schm,
}

Result<std::shared_ptr<Array>> FieldPath::Get(const RecordBatch& batch) const {
return FieldPathGetImpl::Get(this, ZeroCopySelector<Array>(batch.columns()));
// Deliberately calling `column_data` here because `RecordBatch::columns` is nontrivial
ARROW_ASSIGN_OR_RAISE(
auto data,
FieldPathGetImpl::Get(this, ZeroCopySelector<ArrayData>(batch.column_data())));
return MakeArray(data);
}

Result<std::shared_ptr<ChunkedArray>> FieldPath::Get(const Table& table) const {
Expand All @@ -1301,11 +1333,7 @@ Result<std::shared_ptr<Array>> FieldPath::Get(const Array& array) const {
}

Result<std::shared_ptr<ArrayData>> FieldPath::Get(const ArrayData& data) const {
// We indirect from ArrayData to Array rather than vice-versa because, when selecting a
// nested column, the StructArray::field method does the work of adjusting the data's
// offset/length if necessary.
ARROW_ASSIGN_OR_RAISE(auto array, Get(*MakeArray(data.Copy())));
return array->data();
return FieldPathGetImpl::Get(this, ZeroCopySelector<ArrayData>(data));
}

Result<std::shared_ptr<ChunkedArray>> FieldPath::Get(
Expand All @@ -1320,8 +1348,7 @@ Result<std::shared_ptr<Array>> FieldPath::GetFlattened(const Array& array,

Result<std::shared_ptr<ArrayData>> FieldPath::GetFlattened(const ArrayData& data,
MemoryPool* pool) const {
ARROW_ASSIGN_OR_RAISE(auto array, GetFlattened(*MakeArray(data.Copy()), pool));
return array->data();
return FieldPathGetImpl::Get(this, FlatteningSelector<ArrayData>(data, pool));
}

Result<std::shared_ptr<ChunkedArray>> FieldPath::GetFlattened(
Expand All @@ -1332,7 +1359,10 @@ Result<std::shared_ptr<ChunkedArray>> FieldPath::GetFlattened(

Result<std::shared_ptr<Array>> FieldPath::GetFlattened(const RecordBatch& batch,
MemoryPool* pool) const {
return FieldPathGetImpl::Get(this, FlatteningSelector<Array>(batch.columns(), pool));
ARROW_ASSIGN_OR_RAISE(
auto data, FieldPathGetImpl::Get(
this, FlatteningSelector<ArrayData>(batch.column_data(), pool)));
return MakeArray(data);
}

Result<std::shared_ptr<ChunkedArray>> FieldPath::GetFlattened(const Table& table,
Expand Down
125 changes: 125 additions & 0 deletions cpp/src/arrow/type_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
#include <algorithm>
#include <cstdint>
#include <exception>
#include <optional>
#include <random>
#include <string>
#include <vector>

#include "benchmark/benchmark.h"

#include "arrow/array.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/macros.h"
Expand Down Expand Up @@ -418,6 +422,120 @@ static void ErrorSchemeExceptionNoInline(
state.SetItemsProcessed(state.iterations() * integers.size());
}

// ----------------------------------------------------------------------
// FieldPath::Get benchmarks

static std::shared_ptr<Schema> GenerateTestSchema(int num_columns) {
FieldVector fields(num_columns);
for (int i = 0; i < num_columns; ++i) {
auto name = std::string("f") + std::to_string(i);
fields[i] = field(std::move(name), int64());
}
return schema(std::move(fields));
}

static std::shared_ptr<Array> GenerateTestArray(int num_columns) {
constexpr int64_t kLength = 100;

auto rand = random::RandomArrayGenerator(0xbeef);
auto schm = GenerateTestSchema(num_columns);

ArrayVector columns(num_columns);
for (auto& column : columns) {
column = rand.Int64(kLength, 0, std::numeric_limits<int64_t>::max());
}

return *StructArray::Make(columns, schm->fields());
}

static std::shared_ptr<RecordBatch> ToBatch(const std::shared_ptr<Array>& array) {
return *RecordBatch::FromStructArray(array);
}

static std::shared_ptr<ChunkedArray> ToChunked(const std::shared_ptr<Array>& array,
double chunk_proportion = 1.0) {
auto struct_array = internal::checked_pointer_cast<StructArray>(array);
const auto num_rows = struct_array->length();
const auto chunk_length = static_cast<int64_t>(std::ceil(num_rows * chunk_proportion));

ArrayVector chunks;
for (int64_t offset = 0; offset < num_rows;) {
int64_t slice_length = std::min(chunk_length, num_rows - offset);
chunks.push_back(*struct_array->SliceSafe(offset, slice_length));
offset += slice_length;
}

return *ChunkedArray::Make(std::move(chunks));
}

static std::shared_ptr<Table> ToTable(const std::shared_ptr<Array>& array,
double chunk_proportion = 1.0) {
return *Table::FromChunkedStructArray(ToChunked(array, chunk_proportion));
}

template <typename T>
static void BenchmarkFieldPathGet(benchmark::State& state, // NOLINT non-const reference
const T& input, int num_columns,
std::optional<int> num_chunks = {}) {
// Reassigning a single FieldPath var within each iteration's scope seems to be costly
// enough to influence the timings, so we preprocess them.
std::vector<FieldPath> paths(num_columns);
for (int i = 0; i < num_columns; ++i) {
paths[i] = {i};
}

for (auto _ : state) {
for (const auto& path : paths) {
benchmark::DoNotOptimize(path.Get(input).ValueOrDie());
}
}

state.SetItemsProcessed(state.iterations() * num_columns);
state.counters["num_columns"] = num_columns;
if (num_chunks.has_value()) {
state.counters["num_chunks"] = num_chunks.value();
}
}

static void FieldPathGetFromWideArray(
benchmark::State& state) { // NOLINT non-const reference
constexpr int kNumColumns = 10000;
auto array = GenerateTestArray(kNumColumns);
BenchmarkFieldPathGet(state, *array, kNumColumns);
}

static void FieldPathGetFromWideArrayData(
benchmark::State& state) { // NOLINT non-const reference
constexpr int kNumColumns = 10000;
auto array = GenerateTestArray(kNumColumns);
BenchmarkFieldPathGet(state, *array->data(), kNumColumns);
}

static void FieldPathGetFromWideBatch(
benchmark::State& state) { // NOLINT non-const reference
constexpr int kNumColumns = 10000;
auto batch = ToBatch(GenerateTestArray(kNumColumns));
BenchmarkFieldPathGet(state, *batch, kNumColumns);
}

static void FieldPathGetFromWideChunkedArray(
benchmark::State& state) { // NOLINT non-const reference
constexpr int kNumColumns = 10000;
// Percentage representing the size of each chunk relative to the total length (smaller
// proportion means more chunks)
const double chunk_proportion = state.range(0) / 100.0;
auto chunked_array = ToChunked(GenerateTestArray(kNumColumns), chunk_proportion);
BenchmarkFieldPathGet(state, *chunked_array, kNumColumns, chunked_array->num_chunks());
}

static void FieldPathGetFromWideTable(
benchmark::State& state) { // NOLINT non-const reference
constexpr int kNumColumns = 10000;
const double chunk_proportion = state.range(0) / 100.0;
auto table = ToTable(GenerateTestArray(kNumColumns), chunk_proportion);
BenchmarkFieldPathGet(state, *table, kNumColumns, table->column(0)->num_chunks());
}

BENCHMARK(TypeEqualsSimple);
BENCHMARK(TypeEqualsComplex);
BENCHMARK(TypeEqualsWithMetadata);
Expand All @@ -436,4 +554,11 @@ BENCHMARK(ErrorSchemeStatusNoInline);
BENCHMARK(ErrorSchemeResultNoInline);
BENCHMARK(ErrorSchemeExceptionNoInline);

BENCHMARK(FieldPathGetFromWideArray);
BENCHMARK(FieldPathGetFromWideArrayData);
BENCHMARK(FieldPathGetFromWideBatch);

BENCHMARK(FieldPathGetFromWideChunkedArray)->Arg(2)->Arg(10)->Arg(25)->Arg(100);
BENCHMARK(FieldPathGetFromWideTable)->Arg(2)->Arg(10)->Arg(25)->Arg(100);

} // namespace arrow

0 comments on commit 59f30f0

Please sign in to comment.