diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 40fec7d13..c3eea6bc2 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -296,7 +296,7 @@ arrow::Status Splitter::Init() { list_array_idx_.push_back(i); break; case arrow::LargeListType::type_id: - large_list_array_idx_.push_back(i); + list_array_idx_.push_back(i); break; case arrow::NullType::type_id: break; @@ -330,10 +330,6 @@ arrow::Status Splitter::Init() { for (auto i = 0; i < list_array_idx_.size(); ++i) { partition_list_builders_[i].resize(num_partitions_); } - partition_large_list_builders_.resize(large_list_array_idx_.size()); - for (auto i = 0; i < large_list_array_idx_.size(); ++i) { - partition_large_list_builders_[i].resize(num_partitions_); - } ARROW_ASSIGN_OR_RAISE(configured_dirs_, GetConfiguredLocalDirs()); sub_dir_selection_.assign(configured_dirs_.size(), 0); @@ -476,7 +472,6 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer auto binary_idx = 0; auto large_binary_idx = 0; auto list_idx = 0; - auto large_list_idx = 0; auto num_fields = schema_->num_fields(); auto num_rows = partition_buffer_idx_base_[partition_id]; auto buffer_sizes = 0; @@ -530,7 +525,7 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer break; } case arrow::LargeListType::type_id: { - auto& builder = partition_large_list_builders_[large_list_idx][partition_id]; + auto& builder = partition_list_builders_[list_idx][partition_id]; if (reset_buffers) { RETURN_NOT_OK(builder->Finish(&arrays[i])); builder->Reset(); @@ -539,7 +534,7 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer builder->Reset(); RETURN_NOT_OK(builder->Reserve(num_rows)); } - large_list_idx++; + list_idx++; break; } case arrow::NullType::type_id: { @@ -593,12 +588,10 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n auto binary_idx = 0; auto large_binary_idx = 0; auto list_idx = 0; - auto large_list_idx = 0; std::vector> new_binary_builders; std::vector> new_large_binary_builders; - std::vector> new_list_builders; - std::vector> new_large_list_builders; + std::vector> new_list_builders; std::vector> new_value_buffers; std::vector> new_validity_buffers; for (auto i = 0; i < num_fields; ++i) { @@ -626,30 +619,23 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n break; } case arrow::ListType::type_id: { - auto child_type = - std::dynamic_pointer_cast(column_type_id_[i])->value_type(); - std::unique_ptr child_builder; - RETURN_NOT_OK(MakeBuilder(options_.memory_pool, child_type, &child_builder)); - auto builder = std::make_shared(options_.memory_pool, - std::move(child_builder)); - assert(builder != nullptr); - RETURN_NOT_OK(builder->Reserve(new_size)); - new_list_builders.push_back(std::move(builder)); + std::unique_ptr array_builder; + RETURN_NOT_OK( + MakeBuilder(options_.memory_pool, column_type_id_[i], &array_builder)); + assert(array_builder != nullptr); + RETURN_NOT_OK(array_builder->Reserve(new_size)); + new_list_builders.push_back(std::move(array_builder)); list_idx++; break; } case arrow::LargeListType::type_id: { - auto child_type = - std::dynamic_pointer_cast(column_type_id_[i]) - ->value_type(); - std::unique_ptr child_builder; - RETURN_NOT_OK(MakeBuilder(options_.memory_pool, child_type, &child_builder)); - auto builder = std::make_shared( - options_.memory_pool, std::move(child_builder)); - assert(builder != nullptr); - RETURN_NOT_OK(builder->Reserve(new_size)); - new_large_list_builders.push_back(std::move(builder)); - large_list_idx++; + std::unique_ptr array_builder; + RETURN_NOT_OK( + MakeBuilder(options_.memory_pool, column_type_id_[i], &array_builder)); + assert(array_builder != nullptr); + RETURN_NOT_OK(array_builder->Reserve(new_size)); + new_list_builders.push_back(std::move(array_builder)); + list_idx++; break; } case arrow::NullType::type_id: @@ -687,7 +673,6 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n // point to newly allocated buffers fixed_width_idx = binary_idx = large_binary_idx = 0; list_idx = 0; - large_list_idx = 0; for (auto i = 0; i < num_fields; ++i) { switch (column_type_id_[i]->id()) { case arrow::BinaryType::type_id: @@ -708,9 +693,9 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n list_idx++; break; case arrow::LargeListType::type_id: - partition_large_list_builders_[large_list_idx][partition_id] = - std::move(new_large_list_builders[large_list_idx]); - large_list_idx++; + partition_list_builders_[list_idx][partition_id] = + std::move(new_list_builders[list_idx]); + list_idx++; break; case arrow::NullType::type_id: break; @@ -896,7 +881,6 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { RETURN_NOT_OK(SplitBinaryArray(rb)); RETURN_NOT_OK(SplitLargeBinaryArray(rb)); RETURN_NOT_OK(SplitListArray(rb)); - RETURN_NOT_OK(SplitLargeListArray(rb)); // update partition buffer base for (auto pid = 0; pid < num_partitions_; ++pid) { @@ -1234,46 +1218,13 @@ arrow::Status Splitter::SplitListArray(const arrow::RecordBatch& rb) { for (int i = 0; i < list_array_idx_.size(); ++i) { auto src_arr = std::static_pointer_cast(rb.column(list_array_idx_[i])); - switch (src_arr->value_type()->id()) { -#define PROCESS(InType) \ - case InType::type_id: { \ - auto status = AppendList( \ - src_arr, partition_list_builders_[i], rb.num_rows()); \ - if (!status.ok()) return status; \ - } break; - PROCESS_SUPPORTED_TYPES(PROCESS) -#undef PROCESS - default: { - return arrow::Status::NotImplemented( - "AppendList internal type not supported, type is ", - src_arr->value_type()->ToString()); - } break; - } + auto status = AppendList(rb.column(list_array_idx_[i]), partition_list_builders_[i], + rb.num_rows()); + if (!status.ok()) return status; } return arrow::Status::OK(); } -arrow::Status Splitter::SplitLargeListArray(const arrow::RecordBatch& rb) { - for (int i = 0; i < large_list_array_idx_.size(); ++i) { - auto src_arr = std::static_pointer_cast( - rb.column(large_list_array_idx_[i])); - switch (src_arr->value_type()->id()) { -#define PROCESS(InType) \ - case InType::type_id: { \ - return AppendList( \ - src_arr, partition_list_builders_[i], rb.num_rows()); \ - } break; - PROCESS_SUPPORTED_TYPES(PROCESS) -#undef PROCESS - default: { - return arrow::Status::NotImplemented( - "AppendList internal type not supported, type is ", - src_arr->value_type()->ToString()); - } break; - } - } - return arrow::Status::OK(); -} #undef PROCESS_SUPPORTED_TYPES template @@ -1307,51 +1258,13 @@ arrow::Status Splitter::AppendBinary( return arrow::Status::OK(); } -template arrow::Status Splitter::AppendList( - const std::shared_ptr& src_arr, - const std::vector>& dst_builders, int64_t num_rows) { - using offset_type = typename T::offset_type; - using ValueBuilderType = typename arrow::TypeTraits::BuilderType; - using ValueArrayType = typename arrow::TypeTraits::ArrayType; - std::vector dst_values_builders; - dst_values_builders.resize(dst_builders.size()); - for (auto i = 0; i < dst_builders.size(); ++i) { - if (dst_builders[i] != nullptr) - dst_values_builders[i] = - checked_cast(dst_builders[i]->value_builder()); - } - auto src_arr_values = std::dynamic_pointer_cast(src_arr->values()); - - if (src_arr->values()->null_count() == 0) { - for (auto row = 0; row < num_rows; ++row) { - auto src_arr_values_offset = src_arr->value_offset(row); - auto src_arr_values_length = src_arr->value_offset(row + 1) - src_arr_values_offset; - RETURN_NOT_OK(dst_builders[partition_id_[row]]->Append()); - for (auto i = 0; i < src_arr_values_length; i++) { - RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->Append( - src_arr_values->GetView(src_arr_values_offset + i))); - } - } - } else { - for (auto row = 0; row < num_rows; ++row) { - if (src_arr->IsValid(row)) { - auto src_arr_values_offset = src_arr->value_offset(row); - auto src_arr_values_length = - src_arr->value_offset(row + 1) - src_arr_values_offset; - RETURN_NOT_OK(dst_builders[partition_id_[row]]->Append()); - for (auto i = 0; i < src_arr_values_length; i++) { - if (src_arr_values->IsValid(src_arr_values_offset + i)) { - RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->Append( - src_arr_values->GetView(src_arr_values_offset + i))); - } else { - RETURN_NOT_OK(dst_values_builders[partition_id_[row]]->AppendNull()); - } - } - } else { - RETURN_NOT_OK(dst_builders[partition_id_[row]]->AppendNull()); - } - } + const std::shared_ptr& src_arr, + const std::vector>& dst_builders, + int64_t num_rows) { + for (auto row = 0; row < num_rows; ++row) { + RETURN_NOT_OK(dst_builders[partition_id_[row]]->AppendArraySlice( + *(src_arr->data().get()), row, 1)); } return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 501aa6f10..77a48bb2d 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -132,20 +132,16 @@ class Splitter { arrow::Status SplitListArray(const arrow::RecordBatch& rb); - arrow::Status SplitLargeListArray(const arrow::RecordBatch& rb); - template ::ArrayType, typename BuilderType = typename arrow::TypeTraits::BuilderType> arrow::Status AppendBinary( const std::shared_ptr& src_arr, const std::vector>& dst_builders, int64_t num_rows); - template ::ArrayType, - typename BuilderType = typename arrow::TypeTraits::BuilderType> - arrow::Status AppendList(const std::shared_ptr& src_arr, - const std::vector>& dst_builders, - int64_t num_rows); + arrow::Status AppendList( + const std::shared_ptr& src_arr, + const std::vector>& dst_builders, + int64_t num_rows); // Cache the partition buffer/builder as compressed record batch. If reset // buffers, the partition buffer/builder will be set to nullptr. Two cases for @@ -173,6 +169,7 @@ class Splitter { std::vector partition_buffer_size_; std::vector partition_buffer_idx_base_; std::vector partition_buffer_idx_offset_; + std::vector> partition_writer_; std::vector> partition_fixed_width_validity_addrs_; std::vector> partition_fixed_width_value_addrs_; @@ -182,9 +179,7 @@ class Splitter { partition_binary_builders_; std::vector>> partition_large_binary_builders_; - std::vector>> partition_list_builders_; - std::vector>> - partition_large_list_builders_; + std::vector>> partition_list_builders_; std::vector>> partition_cached_recordbatch_; std::vector partition_cached_recordbatch_size_; // in bytes @@ -193,7 +188,6 @@ class Splitter { std::vector binary_array_idx_; std::vector large_binary_array_idx_; std::vector list_array_idx_; - std::vector large_list_array_idx_; bool empirical_size_calculated_ = false; std::vector binary_array_empirical_size_; diff --git a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc index 4fefdbc7b..9b99d6edd 100644 --- a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc +++ b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc @@ -525,6 +525,291 @@ TEST_F(SplitterTest, TestRoundRobinListArraySplitter) { } } +TEST_F(SplitterTest, TestRoundRobinNestListArraySplitter) { + auto f_arr_str = field("f_str", arrow::list(arrow::list(arrow::utf8()))); + auto f_arr_int32 = field("f_int32", arrow::list(arrow::list(arrow::int32()))); + + auto rb_schema = arrow::schema({f_arr_str, f_arr_int32}); + + const std::vector input_data_arr = { + R"([[["alice0", "bob1"]], [["alice2"], ["bob3"]], [["Alice4", "Bob5", "AlicE6"]], [["boB7"], ["ALICE8", "BOB9"]]])", + R"([[[1, 2, 3]], [[9, 8], [null]], [[3, 1], [0]], [[1, 9, null]]])"}; + + std::shared_ptr input_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + int32_t num_partitions = 2; + split_options_.buffer_size = 4; + ARROW_ASSIGN_OR_THROW(splitter_, + Splitter::Make("rr", rb_schema, num_partitions, split_options_)); + + ASSERT_NOT_OK(splitter_->Split(*input_batch_arr)); + ASSERT_NOT_OK(splitter_->Stop()); + + std::shared_ptr file_reader; + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + + // verify partition lengths + const auto& lengths = splitter_->PartitionLengths(); + ASSERT_EQ(lengths.size(), 2); + ASSERT_EQ(*file_->GetSize(), lengths[0] + lengths[1]); + + // verify schema + std::vector> batches; + ASSERT_EQ(*file_reader->schema(), *rb_schema); + + // prepare first block expected result + std::shared_ptr res_batch_0; + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[0, 2]")) + std::vector expected = {res_batch_0.get()}; + + // verify first block + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } + + // prepare second block expected result + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[1, 3]")) + expected = {res_batch_0.get()}; + + // verify second block + batches.clear(); + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + ASSERT_EQ(*file_reader->schema(), *rb_schema); + ASSERT_NOT_OK(file_->Advance(lengths[0])); + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } +} + +TEST_F(SplitterTest, TestRoundRobinNestLargeListArraySplitter) { + auto f_arr_str = field("f_str", arrow::large_list(arrow::list(arrow::utf8()))); + auto f_arr_int32 = field("f_int32", arrow::large_list(arrow::list(arrow::int32()))); + + auto rb_schema = arrow::schema({f_arr_str, f_arr_int32}); + + const std::vector input_data_arr = { + R"([[["alice0", "bob1"]], [["alice2"], ["bob3"]], [["Alice4", "Bob5", "AlicE6"]], [["boB7"], ["ALICE8", "BOB9"]]])", + R"([[[1, 2, 3]], [[9, 8], [null]], [[3, 1], [0]], [[1, 9, null]]])"}; + + std::shared_ptr input_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + int32_t num_partitions = 2; + split_options_.buffer_size = 4; + ARROW_ASSIGN_OR_THROW(splitter_, + Splitter::Make("rr", rb_schema, num_partitions, split_options_)); + + ASSERT_NOT_OK(splitter_->Split(*input_batch_arr)); + ASSERT_NOT_OK(splitter_->Stop()); + + std::shared_ptr file_reader; + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + + // verify partition lengths + const auto& lengths = splitter_->PartitionLengths(); + ASSERT_EQ(lengths.size(), 2); + ASSERT_EQ(*file_->GetSize(), lengths[0] + lengths[1]); + + // verify schema + std::vector> batches; + ASSERT_EQ(*file_reader->schema(), *rb_schema); + + // prepare first block expected result + std::shared_ptr res_batch_0; + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[0, 2]")) + std::vector expected = {res_batch_0.get()}; + + // verify first block + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } + + // prepare second block expected result + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[1, 3]")) + expected = {res_batch_0.get()}; + + // verify second block + batches.clear(); + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + ASSERT_EQ(*file_reader->schema(), *rb_schema); + ASSERT_NOT_OK(file_->Advance(lengths[0])); + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } +} + +TEST_F(SplitterTest, TestRoundRobinListStructArraySplitter) { + auto f_arr_int32 = field("f_int32", arrow::list(arrow::list(arrow::int32()))); + auto f_arr_list_struct = + field("f_list_struct", list(struct_({field("a", int32()), field("b", utf8())}))); + + auto rb_schema = arrow::schema({f_arr_int32, f_arr_list_struct}); + + const std::vector input_data_arr = { + R"([[[1, 2, 3]], [[9, 8], [null]], [[3, 1], [0]], [[1, 9, null]]])", + R"([[{"a": 4, "b": null}], [{"a": 42, "b": null}, {"a": null, "b": "foo2"}], [{"a": 43, "b": "foo3"}], [{"a": 44, "b": "foo4"}]])"}; + + std::shared_ptr input_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + int32_t num_partitions = 2; + split_options_.buffer_size = 4; + ARROW_ASSIGN_OR_THROW(splitter_, + Splitter::Make("rr", rb_schema, num_partitions, split_options_)); + + ASSERT_NOT_OK(splitter_->Split(*input_batch_arr)); + ASSERT_NOT_OK(splitter_->Stop()); + + std::shared_ptr file_reader; + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + + // verify partition lengths + const auto& lengths = splitter_->PartitionLengths(); + ASSERT_EQ(lengths.size(), 2); + ASSERT_EQ(*file_->GetSize(), lengths[0] + lengths[1]); + + // verify schema + std::vector> batches; + ASSERT_EQ(*file_reader->schema(), *rb_schema); + + // prepare first block expected result + std::shared_ptr res_batch_0; + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[0, 2]")) + std::vector expected = {res_batch_0.get()}; + + // verify first block + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } + + // prepare second block expected result + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[1, 3]")) + expected = {res_batch_0.get()}; + + // verify second block + batches.clear(); + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + ASSERT_EQ(*file_reader->schema(), *rb_schema); + ASSERT_NOT_OK(file_->Advance(lengths[0])); + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } +} + +TEST_F(SplitterTest, TestRoundRobinListMapArraySplitter) { + auto f_arr_int32 = field("f_int32", arrow::list(arrow::list(arrow::int32()))); + auto f_arr_list_map = field("f_list_map", list(map(utf8(), utf8()))); + + auto rb_schema = arrow::schema({f_arr_int32, f_arr_list_map}); + + const std::vector input_data_arr = { + R"([[[1, 2, 3]], [[9, 8], [null]], [[3, 1], [0]], [[1, 9, null]]])", + R"([[[["key1", "val_aa1"]]], [[["key1", "val_bb1"]], [["key2", "val_bb2"]]], [[["key1", "val_cc1"]]], [[["key1", "val_dd1"]]]])"}; + + std::shared_ptr input_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + int32_t num_partitions = 2; + split_options_.buffer_size = 4; + ARROW_ASSIGN_OR_THROW(splitter_, + Splitter::Make("rr", rb_schema, num_partitions, split_options_)); + + ASSERT_NOT_OK(splitter_->Split(*input_batch_arr)); + ASSERT_NOT_OK(splitter_->Stop()); + + std::shared_ptr file_reader; + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + + // verify partition lengths + const auto& lengths = splitter_->PartitionLengths(); + ASSERT_EQ(lengths.size(), 2); + ASSERT_EQ(*file_->GetSize(), lengths[0] + lengths[1]); + + // verify schema + std::vector> batches; + ASSERT_EQ(*file_reader->schema(), *rb_schema); + + // prepare first block expected result + std::shared_ptr res_batch_0; + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[0, 2]")) + std::vector expected = {res_batch_0.get()}; + + // verify first block + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } + + // prepare second block expected result + ARROW_ASSIGN_OR_THROW(res_batch_0, TakeRows(input_batch_arr, "[1, 3]")) + expected = {res_batch_0.get()}; + + // verify second block + batches.clear(); + ARROW_ASSIGN_OR_THROW(file_reader, GetRecordBatchStreamReader(splitter_->DataFile())); + ASSERT_EQ(*file_reader->schema(), *rb_schema); + ASSERT_NOT_OK(file_->Advance(lengths[0])); + ASSERT_NOT_OK(file_reader->ReadAll(&batches)); + ASSERT_EQ(batches.size(), 1); + for (auto i = 0; i < batches.size(); ++i) { + const auto& rb = batches[i]; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + ASSERT_TRUE(rb->Equals(*expected[i])); + } +} + TEST_F(SplitterTest, TestHashListArraySplitterWithMorePartitions) { int32_t num_partitions = 5; split_options_.buffer_size = 4;