Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-712] Optimize Array split and support nested Array (#713)
Browse files Browse the repository at this point in the history
* Add first commit

* Fix Complie error

* Add customer cases

* Fix largelist issue and Add largelist uts

* Clean code

* Triger github action

* Fix clang format
  • Loading branch information
zhixingheyi-tian authored Feb 9, 2022
1 parent 35caf0a commit 07020ba
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 128 deletions.
145 changes: 29 additions & 116 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ arrow::Status Splitter::Init() {
list_array_idx_.push_back(i);
break;
case arrow::LargeListType::type_id:
large_list_array_idx_.push_back(i);
list_array_idx_.push_back(i);
break;
case arrow::NullType::type_id:
break;
Expand Down Expand Up @@ -330,10 +330,6 @@ arrow::Status Splitter::Init() {
for (auto i = 0; i < list_array_idx_.size(); ++i) {
partition_list_builders_[i].resize(num_partitions_);
}
partition_large_list_builders_.resize(large_list_array_idx_.size());
for (auto i = 0; i < large_list_array_idx_.size(); ++i) {
partition_large_list_builders_[i].resize(num_partitions_);
}

ARROW_ASSIGN_OR_RAISE(configured_dirs_, GetConfiguredLocalDirs());
sub_dir_selection_.assign(configured_dirs_.size(), 0);
Expand Down Expand Up @@ -476,7 +472,6 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
auto binary_idx = 0;
auto large_binary_idx = 0;
auto list_idx = 0;
auto large_list_idx = 0;
auto num_fields = schema_->num_fields();
auto num_rows = partition_buffer_idx_base_[partition_id];
auto buffer_sizes = 0;
Expand Down Expand Up @@ -530,7 +525,7 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
break;
}
case arrow::LargeListType::type_id: {
auto& builder = partition_large_list_builders_[large_list_idx][partition_id];
auto& builder = partition_list_builders_[list_idx][partition_id];
if (reset_buffers) {
RETURN_NOT_OK(builder->Finish(&arrays[i]));
builder->Reset();
Expand All @@ -539,7 +534,7 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
builder->Reset();
RETURN_NOT_OK(builder->Reserve(num_rows));
}
large_list_idx++;
list_idx++;
break;
}
case arrow::NullType::type_id: {
Expand Down Expand Up @@ -593,12 +588,10 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n
auto binary_idx = 0;
auto large_binary_idx = 0;
auto list_idx = 0;
auto large_list_idx = 0;

std::vector<std::shared_ptr<arrow::BinaryBuilder>> new_binary_builders;
std::vector<std::shared_ptr<arrow::LargeBinaryBuilder>> new_large_binary_builders;
std::vector<std::shared_ptr<arrow::ListBuilder>> new_list_builders;
std::vector<std::shared_ptr<arrow::LargeListBuilder>> new_large_list_builders;
std::vector<std::shared_ptr<arrow::ArrayBuilder>> new_list_builders;
std::vector<std::shared_ptr<arrow::ResizableBuffer>> new_value_buffers;
std::vector<std::shared_ptr<arrow::ResizableBuffer>> new_validity_buffers;
for (auto i = 0; i < num_fields; ++i) {
Expand Down Expand Up @@ -626,30 +619,23 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n
break;
}
case arrow::ListType::type_id: {
auto child_type =
std::dynamic_pointer_cast<arrow::ListType>(column_type_id_[i])->value_type();
std::unique_ptr<arrow::ArrayBuilder> child_builder;
RETURN_NOT_OK(MakeBuilder(options_.memory_pool, child_type, &child_builder));
auto builder = std::make_shared<arrow::ListBuilder>(options_.memory_pool,
std::move(child_builder));
assert(builder != nullptr);
RETURN_NOT_OK(builder->Reserve(new_size));
new_list_builders.push_back(std::move(builder));
std::unique_ptr<arrow::ArrayBuilder> array_builder;
RETURN_NOT_OK(
MakeBuilder(options_.memory_pool, column_type_id_[i], &array_builder));
assert(array_builder != nullptr);
RETURN_NOT_OK(array_builder->Reserve(new_size));
new_list_builders.push_back(std::move(array_builder));
list_idx++;
break;
}
case arrow::LargeListType::type_id: {
auto child_type =
std::dynamic_pointer_cast<arrow::LargeListType>(column_type_id_[i])
->value_type();
std::unique_ptr<arrow::ArrayBuilder> child_builder;
RETURN_NOT_OK(MakeBuilder(options_.memory_pool, child_type, &child_builder));
auto builder = std::make_shared<arrow::LargeListBuilder>(
options_.memory_pool, std::move(child_builder));
assert(builder != nullptr);
RETURN_NOT_OK(builder->Reserve(new_size));
new_large_list_builders.push_back(std::move(builder));
large_list_idx++;
std::unique_ptr<arrow::ArrayBuilder> array_builder;
RETURN_NOT_OK(
MakeBuilder(options_.memory_pool, column_type_id_[i], &array_builder));
assert(array_builder != nullptr);
RETURN_NOT_OK(array_builder->Reserve(new_size));
new_list_builders.push_back(std::move(array_builder));
list_idx++;
break;
}
case arrow::NullType::type_id:
Expand Down Expand Up @@ -687,7 +673,6 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n
// point to newly allocated buffers
fixed_width_idx = binary_idx = large_binary_idx = 0;
list_idx = 0;
large_list_idx = 0;
for (auto i = 0; i < num_fields; ++i) {
switch (column_type_id_[i]->id()) {
case arrow::BinaryType::type_id:
Expand All @@ -708,9 +693,9 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n
list_idx++;
break;
case arrow::LargeListType::type_id:
partition_large_list_builders_[large_list_idx][partition_id] =
std::move(new_large_list_builders[large_list_idx]);
large_list_idx++;
partition_list_builders_[list_idx][partition_id] =
std::move(new_list_builders[list_idx]);
list_idx++;
break;
case arrow::NullType::type_id:
break;
Expand Down Expand Up @@ -896,7 +881,6 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
RETURN_NOT_OK(SplitBinaryArray(rb));
RETURN_NOT_OK(SplitLargeBinaryArray(rb));
RETURN_NOT_OK(SplitListArray(rb));
RETURN_NOT_OK(SplitLargeListArray(rb));

// update partition buffer base
for (auto pid = 0; pid < num_partitions_; ++pid) {
Expand Down Expand Up @@ -1234,46 +1218,13 @@ arrow::Status Splitter::SplitListArray(const arrow::RecordBatch& rb) {
for (int i = 0; i < list_array_idx_.size(); ++i) {
auto src_arr =
std::static_pointer_cast<arrow::ListArray>(rb.column(list_array_idx_[i]));
switch (src_arr->value_type()->id()) {
#define PROCESS(InType) \
case InType::type_id: { \
auto status = AppendList<arrow::ListType, InType>( \
src_arr, partition_list_builders_[i], rb.num_rows()); \
if (!status.ok()) return status; \
} break;
PROCESS_SUPPORTED_TYPES(PROCESS)
#undef PROCESS
default: {
return arrow::Status::NotImplemented(
"AppendList internal type not supported, type is ",
src_arr->value_type()->ToString());
} break;
}
auto status = AppendList(rb.column(list_array_idx_[i]), partition_list_builders_[i],
rb.num_rows());
if (!status.ok()) return status;
}
return arrow::Status::OK();
}

arrow::Status Splitter::SplitLargeListArray(const arrow::RecordBatch& rb) {
for (int i = 0; i < large_list_array_idx_.size(); ++i) {
auto src_arr = std::static_pointer_cast<arrow::LargeListArray>(
rb.column(large_list_array_idx_[i]));
switch (src_arr->value_type()->id()) {
#define PROCESS(InType) \
case InType::type_id: { \
return AppendList<arrow::LargeListType, InType>( \
src_arr, partition_list_builders_[i], rb.num_rows()); \
} break;
PROCESS_SUPPORTED_TYPES(PROCESS)
#undef PROCESS
default: {
return arrow::Status::NotImplemented(
"AppendList internal type not supported, type is ",
src_arr->value_type()->ToString());
} break;
}
}
return arrow::Status::OK();
}
#undef PROCESS_SUPPORTED_TYPES

template <typename T, typename ArrayType, typename BuilderType>
Expand Down Expand Up @@ -1307,51 +1258,13 @@ arrow::Status Splitter::AppendBinary(
return arrow::Status::OK();
}

template <typename T, typename ValueType, typename ArrayType, typename BuilderType>
arrow::Status Splitter::AppendList(
const std::shared_ptr<ArrayType>& src_arr,
const std::vector<std::shared_ptr<BuilderType>>& dst_builders, int64_t num_rows) {
using offset_type = typename T::offset_type;
using ValueBuilderType = typename arrow::TypeTraits<ValueType>::BuilderType;
using ValueArrayType = typename arrow::TypeTraits<ValueType>::ArrayType;
std::vector<ValueBuilderType*> dst_values_builders;
dst_values_builders.resize(dst_builders.size());
for (auto i = 0; i < dst_builders.size(); ++i) {
if (dst_builders[i] != nullptr)
dst_values_builders[i] =
checked_cast<ValueBuilderType*>(dst_builders[i]->value_builder());
}
auto src_arr_values = std::dynamic_pointer_cast<ValueArrayType>(src_arr->values());

if (src_arr->values()->null_count() == 0) {
for (auto row = 0; row < num_rows; ++row) {
auto src_arr_values_offset = src_arr->value_offset(row);
auto src_arr_values_length = src_arr->value_offset(row + 1) - src_arr_values_offset;
RETURN_NOT_OK(dst_builders[partition_id_[row]]->Append());
for (auto i = 0; i < src_arr_values_length; i++) {
RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->Append(
src_arr_values->GetView(src_arr_values_offset + i)));
}
}
} else {
for (auto row = 0; row < num_rows; ++row) {
if (src_arr->IsValid(row)) {
auto src_arr_values_offset = src_arr->value_offset(row);
auto src_arr_values_length =
src_arr->value_offset(row + 1) - src_arr_values_offset;
RETURN_NOT_OK(dst_builders[partition_id_[row]]->Append());
for (auto i = 0; i < src_arr_values_length; i++) {
if (src_arr_values->IsValid(src_arr_values_offset + i)) {
RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->Append(
src_arr_values->GetView(src_arr_values_offset + i)));
} else {
RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->AppendNull());
}
}
} else {
RETURN_NOT_OK(dst_builders[partition_id_[row]]->AppendNull());
}
}
const std::shared_ptr<arrow::Array>& src_arr,
const std::vector<std::shared_ptr<arrow::ArrayBuilder>>& dst_builders,
int64_t num_rows) {
for (auto row = 0; row < num_rows; ++row) {
RETURN_NOT_OK(dst_builders[partition_id_[row]]->AppendArraySlice(
*(src_arr->data().get()), row, 1));
}
return arrow::Status::OK();
}
Expand Down
18 changes: 6 additions & 12 deletions native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,16 @@ class Splitter {

arrow::Status SplitListArray(const arrow::RecordBatch& rb);

arrow::Status SplitLargeListArray(const arrow::RecordBatch& rb);

template <typename T, typename ArrayType = typename arrow::TypeTraits<T>::ArrayType,
typename BuilderType = typename arrow::TypeTraits<T>::BuilderType>
arrow::Status AppendBinary(
const std::shared_ptr<ArrayType>& src_arr,
const std::vector<std::shared_ptr<BuilderType>>& dst_builders, int64_t num_rows);

template <typename T, typename ValueType,
typename ArrayType = typename arrow::TypeTraits<T>::ArrayType,
typename BuilderType = typename arrow::TypeTraits<T>::BuilderType>
arrow::Status AppendList(const std::shared_ptr<ArrayType>& src_arr,
const std::vector<std::shared_ptr<BuilderType>>& dst_builders,
int64_t num_rows);
arrow::Status AppendList(
const std::shared_ptr<arrow::Array>& src_arr,
const std::vector<std::shared_ptr<arrow::ArrayBuilder>>& dst_builders,
int64_t num_rows);

// Cache the partition buffer/builder as compressed record batch. If reset
// buffers, the partition buffer/builder will be set to nullptr. Two cases for
Expand Down Expand Up @@ -173,6 +169,7 @@ class Splitter {
std::vector<int32_t> partition_buffer_size_;
std::vector<int32_t> partition_buffer_idx_base_;
std::vector<int32_t> partition_buffer_idx_offset_;

std::vector<std::shared_ptr<PartitionWriter>> partition_writer_;
std::vector<std::vector<uint8_t*>> partition_fixed_width_validity_addrs_;
std::vector<std::vector<uint8_t*>> partition_fixed_width_value_addrs_;
Expand All @@ -182,9 +179,7 @@ class Splitter {
partition_binary_builders_;
std::vector<std::vector<std::shared_ptr<arrow::LargeBinaryBuilder>>>
partition_large_binary_builders_;
std::vector<std::vector<std::shared_ptr<arrow::ListBuilder>>> partition_list_builders_;
std::vector<std::vector<std::shared_ptr<arrow::LargeListBuilder>>>
partition_large_list_builders_;
std::vector<std::vector<std::shared_ptr<arrow::ArrayBuilder>>> partition_list_builders_;
std::vector<std::vector<std::shared_ptr<arrow::ipc::IpcPayload>>>
partition_cached_recordbatch_;
std::vector<int64_t> partition_cached_recordbatch_size_; // in bytes
Expand All @@ -193,7 +188,6 @@ class Splitter {
std::vector<int32_t> binary_array_idx_;
std::vector<int32_t> large_binary_array_idx_;
std::vector<int32_t> list_array_idx_;
std::vector<int32_t> large_list_array_idx_;

bool empirical_size_calculated_ = false;
std::vector<int32_t> binary_array_empirical_size_;
Expand Down
Loading

0 comments on commit 07020ba

Please sign in to comment.