Skip to content

Commit

Permalink
[NSE-602] Fix Array type shuffle split segmentation fault (oap-projec…
Browse files Browse the repository at this point in the history
…t#623)

* [NSE-602] Fix Array type shuffle split segmentation fault

* Fix clang code format
  • Loading branch information
zhixingheyi-tian authored Jan 12, 2022
1 parent c4052de commit cdea1af
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class ColumnarShuffleExchangeExec(
// check input datatype
for (attr <- child.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
ConverterUtils.createArrowField(attr)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
Expand Down
8 changes: 5 additions & 3 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1315,9 +1315,11 @@ arrow::Status Splitter::AppendList(
using ValueBuilderType = typename arrow::TypeTraits<ValueType>::BuilderType;
using ValueArrayType = typename arrow::TypeTraits<ValueType>::ArrayType;
std::vector<ValueBuilderType*> dst_values_builders;
for (auto builder : dst_builders) {
dst_values_builders.push_back(
checked_cast<ValueBuilderType*>(builder->value_builder()));
dst_values_builders.resize(dst_builders.size());
for (auto i = 0; i < dst_builders.size(); ++i) {
if (dst_builders[i] != nullptr)
dst_values_builders[i] =
checked_cast<ValueBuilderType*>(dst_builders[i]->value_builder());
}
auto src_arr_values = std::dynamic_pointer_cast<ValueArrayType>(src_arr->values());

Expand Down
45 changes: 45 additions & 0 deletions native-sql-engine/cpp/src/tests/shuffle_split_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,51 @@ TEST_F(SplitterTest, TestRoundRobinListArraySplitter) {
}
}

TEST_F(SplitterTest, TestHashListArraySplitterWithMorePartitions) {
int32_t num_partitions = 5;
split_options_.buffer_size = 4;

auto f_uint64 = field("f_uint64", arrow::uint64());
auto f_arr_str = field("f_arr", arrow::list(arrow::utf8()));

auto rb_schema = arrow::schema({f_uint64, f_arr_str});

const std::vector<std::string> input_batch_1_data = {
R"([1, 2])", R"([["alice0", "bob1"], ["alice2"]])"};
std::shared_ptr<arrow::RecordBatch> input_batch_arr;
MakeInputBatch(input_batch_1_data, rb_schema, &input_batch_arr);

auto f_2 = TreeExprBuilder::MakeField(f_uint64);
auto expr_1 = TreeExprBuilder::MakeExpression(f_2, field("f_uint64", uint64()));

ARROW_ASSIGN_OR_THROW(splitter_, Splitter::Make("hash", rb_schema, num_partitions,
{expr_1}, split_options_));

ASSERT_NOT_OK(splitter_->Split(*input_batch_arr));

ASSERT_NOT_OK(splitter_->Stop());

const auto& lengths = splitter_->PartitionLengths();
ASSERT_EQ(lengths.size(), 5);

CheckFileExsists(splitter_->DataFile());

std::shared_ptr<arrow::ipc::RecordBatchReader> file_reader;
ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile()));

ASSERT_EQ(*file_reader->schema(), *rb_schema);

std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
ASSERT_NOT_OK(file_reader->ReadAll(&batches));

for (const auto& rb : batches) {
ASSERT_EQ(rb->num_columns(), rb_schema->num_fields());
for (auto i = 0; i < rb->num_columns(); ++i) {
ASSERT_EQ(rb->column(i)->length(), rb->num_rows());
}
}
}

TEST_F(SplitterTest, TestRoundRobinListArraySplitterwithCompression) {
auto f_arr_str = field("f_arr", arrow::list(arrow::utf8()));
auto f_arr_bool = field("f_bool", arrow::list(arrow::boolean()));
Expand Down

0 comments on commit cdea1af

Please sign in to comment.