diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_appender.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_appender.h index 0dc81d2a5..44ce32dc0 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_appender.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_appender.h @@ -68,6 +68,10 @@ class AppenderBase { return arrow::Status::NotImplemented("AppenderBase Finish is abstract."); } + virtual arrow::Status Reserve(uint64_t) { + return arrow::Status::NotImplemented("AppenderBase Reset is abstract."); + } + virtual arrow::Status Reset() { return arrow::Status::NotImplemented("AppenderBase Reset is abstract."); } @@ -469,6 +473,422 @@ static arrow::Status MakeAppender(arrow::compute::ExecContext* ctx, } #undef PROCESS_SUPPORTED_TYPES +/// unsafe appender //// +template +class UnsafeArrayAppender {}; + +template +class UnsafeArrayAppender> + : public AppenderBase { + public: + UnsafeArrayAppender(arrow::compute::ExecContext* ctx, AppenderType type = left) + : ctx_(ctx), type_(type) { + std::unique_ptr array_builder; + arrow::MakeBuilder(ctx_->memory_pool(), arrow::TypeTraits::type_singleton(), + &array_builder); + builder_.reset(arrow::internal::checked_cast(array_builder.release())); + } + ~UnsafeArrayAppender() {} + + AppenderType GetType() override { return type_; } + arrow::Status AddArray(const std::shared_ptr& arr) override { + auto typed_arr_ = std::dynamic_pointer_cast(arr); + cached_arr_.emplace_back(typed_arr_); + if (typed_arr_->null_count() > 0) has_null_ = true; + return arrow::Status::OK(); + } + + arrow::Status PopArray() override { + cached_arr_.pop_back(); + has_null_ = false; + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override { + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + builder_->UnsafeAppendNull(); + } else { + builder_->UnsafeAppend(cached_arr_[array_id]->GetView(item_id)); + } + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id, + int repeated) override { + if (repeated == 0) return arrow::Status::OK(); + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + // TODO: unloop here and use unsafeappend + RETURN_NOT_OK(builder_->AppendNulls(repeated)); + } else { + auto val = cached_arr_[array_id]->GetView(item_id); + std::vector values; + values.resize(repeated, val); + // TODO: unloop here and use unsafeappend + RETURN_NOT_OK(builder_->AppendValues(values.data(), repeated)); + } + return arrow::Status::OK(); + } + + arrow::Status Append(const std::vector& index_list) { + for (auto tmp : index_list) { + if (has_null_ && cached_arr_[tmp.array_id]->null_count() > 0 && + cached_arr_[tmp.array_id]->IsNull(tmp.id)) { + builder_->AppendNull(); + } else { + builder_->UnsafeAppend(cached_arr_[tmp.array_id]->GetView(tmp.id)); + } + } + return arrow::Status::OK(); + } + + arrow::Status AppendNull() override { + // TODO: use unsafe append + return builder_->AppendNull(); + } + + arrow::Status Finish(std::shared_ptr* out_) override { + auto status = builder_->Finish(out_); + return status; + } + + arrow::Status Reserve(uint64_t len) override { + builder_->Reserve(len); + return arrow::Status::OK(); + } + + arrow::Status Reset() override { + builder_->Reset(); + return arrow::Status::OK(); + } + + private: + using BuilderType_ = typename arrow::TypeTraits::BuilderType; + using ArrayType_ = typename arrow::TypeTraits::ArrayType; + using CType = typename arrow::TypeTraits::CType; + std::unique_ptr builder_; + std::vector> cached_arr_; + arrow::compute::ExecContext* ctx_; + AppenderType type_; + bool has_null_ = false; +}; + +// TODO(): this is a fake unsafeappende for string array +template +class UnsafeArrayAppender> + : public AppenderBase { + public: + UnsafeArrayAppender(arrow::compute::ExecContext* ctx, AppenderType type = left) + : ctx_(ctx), type_(type) { + std::unique_ptr array_builder; + arrow::MakeBuilder(ctx_->memory_pool(), arrow::TypeTraits::type_singleton(), + &array_builder); + builder_.reset(arrow::internal::checked_cast(array_builder.release())); + } + ~UnsafeArrayAppender() {} + + AppenderType GetType() override { return type_; } + arrow::Status AddArray(const std::shared_ptr& arr) override { + auto typed_arr_ = std::dynamic_pointer_cast(arr); + cached_arr_.emplace_back(typed_arr_); + if (typed_arr_->null_count() > 0) has_null_ = true; + return arrow::Status::OK(); + } + + arrow::Status PopArray() override { + cached_arr_.pop_back(); + has_null_ = false; + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override { + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id))); + } + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id, + int repeated) override { + if (repeated == 0) return arrow::Status::OK(); + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + RETURN_NOT_OK(builder_->AppendNulls(repeated)); + } else { + auto val = cached_arr_[array_id]->GetView(item_id); + for (int i = 0; i < repeated; i++) { + RETURN_NOT_OK(builder_->Append(val)); + } + } + return arrow::Status::OK(); + } + + arrow::Status Append(const std::vector& index_list) { + for (auto tmp : index_list) { + if (has_null_ && cached_arr_[tmp.array_id]->null_count() > 0 && + cached_arr_[tmp.array_id]->IsNull(tmp.id)) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id))); + } + } + return arrow::Status::OK(); + } + + arrow::Status AppendNull() override { return builder_->AppendNull(); } + + arrow::Status Finish(std::shared_ptr* out_) override { + auto status = builder_->Finish(out_); + return status; + } + + arrow::Status Reserve(uint64_t len) override { + // builder_->Reserve(len); + return arrow::Status::OK(); + } + + arrow::Status Reset() override { + builder_->Reset(); + return arrow::Status::OK(); + } + + private: + using BuilderType_ = typename arrow::TypeTraits::BuilderType; + using ArrayType_ = typename arrow::TypeTraits::ArrayType; + std::unique_ptr builder_; + std::vector> cached_arr_; + arrow::compute::ExecContext* ctx_; + AppenderType type_; + bool has_null_ = false; +}; + +// TOOD(): this is a fake unsafeappender for boolean array +template +class UnsafeArrayAppender> + : public AppenderBase { + public: + UnsafeArrayAppender(arrow::compute::ExecContext* ctx, AppenderType type = left) + : ctx_(ctx), type_(type) { + std::unique_ptr array_builder; + arrow::MakeBuilder(ctx_->memory_pool(), arrow::TypeTraits::type_singleton(), + &array_builder); + builder_.reset(arrow::internal::checked_cast(array_builder.release())); + } + ~UnsafeArrayAppender() {} + + AppenderType GetType() override { return type_; } + arrow::Status AddArray(const std::shared_ptr& arr) override { + auto typed_arr_ = std::dynamic_pointer_cast(arr); + cached_arr_.emplace_back(typed_arr_); + if (typed_arr_->null_count() > 0) has_null_ = true; + return arrow::Status::OK(); + } + + arrow::Status PopArray() override { + cached_arr_.pop_back(); + has_null_ = false; + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override { + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id))); + } + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id, + int repeated) override { + if (repeated == 0) return arrow::Status::OK(); + if (has_null_ && cached_arr_[array_id]->null_count() > 0 && + cached_arr_[array_id]->IsNull(item_id)) { + RETURN_NOT_OK(builder_->AppendNulls(repeated)); + } else { + auto val = cached_arr_[array_id]->GetView(item_id); + for (int i = 0; i < repeated; i++) { + RETURN_NOT_OK(builder_->Append(val)); + } + } + return arrow::Status::OK(); + } + + arrow::Status Append(const std::vector& index_list) { + for (auto tmp : index_list) { + if (has_null_ && cached_arr_[tmp.array_id]->null_count() > 0 && + cached_arr_[tmp.array_id]->IsNull(tmp.id)) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id))); + } + } + return arrow::Status::OK(); + } + + arrow::Status AppendNull() override { return builder_->AppendNull(); } + + arrow::Status AppendExistence(bool is_exist) { return builder_->Append(is_exist); } + + arrow::Status Finish(std::shared_ptr* out_) override { + auto status = builder_->Finish(out_); + return status; + } + + arrow::Status Reserve(uint64_t len) override { + // builder_->Reserve(len); + return arrow::Status::OK(); + } + + arrow::Status Reset() override { + builder_->Reset(); + return arrow::Status::OK(); + } + + private: + using BuilderType_ = typename arrow::TypeTraits::BuilderType; + using ArrayType_ = typename arrow::TypeTraits::ArrayType; + std::unique_ptr builder_; + std::vector> cached_arr_; + arrow::compute::ExecContext* ctx_; + AppenderType type_; + bool has_null_ = false; +}; + +template +class UnsafeArrayAppender> : public AppenderBase { + public: + UnsafeArrayAppender(arrow::compute::ExecContext* ctx, + std::shared_ptr data_type, + AppenderType type = left) + : ctx_(ctx), type_(type) { + std::unique_ptr array_builder; + arrow::MakeBuilder(ctx_->memory_pool(), data_type, &array_builder); + builder_.reset(arrow::internal::checked_cast(array_builder.release())); + } + ~UnsafeArrayAppender() {} + + AppenderType GetType() override { return type_; } + arrow::Status AddArray(const std::shared_ptr& arr) override { + auto typed_arr_ = std::dynamic_pointer_cast(arr); + cached_arr_.emplace_back(typed_arr_); + if (typed_arr_->null_count() > 0) has_null_ = true; + return arrow::Status::OK(); + } + + arrow::Status PopArray() override { + cached_arr_.pop_back(); + has_null_ = false; + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override { + if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) { + builder_->UnsafeAppendNull(); + } else { + builder_->UnsafeAppend(cached_arr_[array_id]->GetView(item_id)); + } + return arrow::Status::OK(); + } + + arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id, + int repeated) override { + if (repeated == 0) return arrow::Status::OK(); + if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) { + RETURN_NOT_OK(builder_->AppendNulls(repeated)); + } else { + auto val = cached_arr_[array_id]->GetView(item_id); + for (int i = 0; i < repeated; i++) { + RETURN_NOT_OK(builder_->Append(val)); + } + } + return arrow::Status::OK(); + } + + arrow::Status Append(const std::vector& index_list) { + for (auto tmp : index_list) { + if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id))); + } + } + return arrow::Status::OK(); + } + + arrow::Status AppendNull() override { return builder_->AppendNull(); } + + arrow::Status Finish(std::shared_ptr* out_) override { + auto status = builder_->Finish(out_); + return status; + } + + arrow::Status Reserve(uint64_t len) override { + builder_->Reserve(len); + return arrow::Status::OK(); + } + + arrow::Status Reset() override { + builder_->Reset(); + return arrow::Status::OK(); + } + + private: + using BuilderType_ = typename arrow::TypeTraits::BuilderType; + using ArrayType_ = typename arrow::TypeTraits::ArrayType; + std::unique_ptr builder_; + std::vector> cached_arr_; + arrow::compute::ExecContext* ctx_; + AppenderType type_; + bool has_null_ = false; +}; + +#define PROCESS_SUPPORTED_TYPES(PROCESS) \ + PROCESS(arrow::BooleanType) \ + PROCESS(arrow::UInt8Type) \ + PROCESS(arrow::Int8Type) \ + PROCESS(arrow::UInt16Type) \ + PROCESS(arrow::Int16Type) \ + PROCESS(arrow::UInt32Type) \ + PROCESS(arrow::Int32Type) \ + PROCESS(arrow::UInt64Type) \ + PROCESS(arrow::Int64Type) \ + PROCESS(arrow::FloatType) \ + PROCESS(arrow::DoubleType) \ + PROCESS(arrow::Date32Type) \ + PROCESS(arrow::Date64Type) \ + PROCESS(arrow::StringType) +static arrow::Status MakeUnsafeAppender(arrow::compute::ExecContext* ctx, + std::shared_ptr type, + AppenderBase::AppenderType appender_type, + std::shared_ptr* out) { + switch (type->id()) { +#define PROCESS(InType) \ + case InType::type_id: { \ + auto app_ptr = std::make_shared>(ctx, appender_type); \ + *out = std::dynamic_pointer_cast(app_ptr); \ + } break; + PROCESS_SUPPORTED_TYPES(PROCESS) +#undef PROCESS + case arrow::Decimal128Type::type_id: { + auto app_ptr = std::make_shared>( + ctx, type, appender_type); + *out = std::dynamic_pointer_cast(app_ptr); + } break; + default: { + return arrow::Status::NotImplemented("MakeAppender type not supported, type is ", + type->ToString()); + } break; + } + return arrow::Status::OK(); +} +#undef PROCESS_SUPPORTED_TYPES + } // namespace extra } // namespace arrowcompute } // namespace codegen diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 38c8aac12..98f89ab2b 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -664,7 +664,7 @@ extern "C" void MakeCodeGen(arrow::compute::ExecContext* ctx, for (int i = 0; i < col_num_; i++) { auto field = schema->field(i); std::shared_ptr appender; - THROW_NOT_OK(MakeAppender(ctx_, field->type(), appender_type, &appender)); + THROW_NOT_OK(MakeUnsafeAppender(ctx_, field->type(), appender_type, &appender)); appender_list_.push_back(appender); } for (int i = 0; i < col_num_; i++) { @@ -693,6 +693,7 @@ extern "C" void MakeCodeGen(arrow::compute::ExecContext* ctx, : (total_length_ - offset_); uint64_t count = 0; for (int i = 0; i < col_num_; i++) { + RETURN_NOT_OK(appender_list_[i]->Reserve(length)); while (count < length) { auto item = indices_begin_ + offset_ + count++; RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id));