Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Support StringType input and refactor the code
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Sep 28, 2022
1 parent 77fc20a commit a1afd81
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 85 deletions.
12 changes: 2 additions & 10 deletions native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,20 +363,12 @@ class WindowLagKernel : public WindowRankKernel {

arrow::Status Finish(ArrayList* out) override;

// template <typename VALUE_TYPE, typename CType, typename ArrayType,
// precompile::enable_if_number<VALUE_TYPE>> CType
// getElement(std::shared_ptr<ArrayType> typed_array, uint32_t id);

// template <typename VALUE_TYPE, typename CType, typename ArrayType,
// precompile::enable_if_string_like<VALUE_TYPE>> CType
// getElement(std::shared_ptr<ArrayType> typed_array, uint32_t id);

template <typename VALUE_TYPE, typename CType, typename BuilderType, typename ArrayType>
template <typename VALUE_TYPE, typename CType, typename BuilderType, typename ArrayType, typename OP>
arrow::Status HandleSortedPartition(
std::vector<ArrayList>& values,
std::vector<std::shared_ptr<arrow::Int32Array>>& group_ids, int32_t max_group_id,
std::vector<std::vector<std::shared_ptr<ArrayItemIndexS>>>& sorted_partitions,
ArrayList* out);
ArrayList* out, OP op);

private:
// positive offset means lag to the above row from the current row with an offset.
Expand Down
163 changes: 88 additions & 75 deletions native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,72 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) {
return arrow::Status::OK();
}

static arrow::Status EncodeIndices(std::vector<std::shared_ptr<ArrayItemIndexS>> in,
std::shared_ptr<arrow::Array>* out) {
arrow::UInt64Builder builder;
for (const auto& each : in) {
uint64_t encoded = ((uint64_t)(each->array_id) << 32U) ^ ((uint64_t)(each->id));
RETURN_NOT_OK(builder.Append(encoded));
}
RETURN_NOT_OK(builder.Finish(out));
return arrow::Status::OK();
}

static arrow::Status DecodeIndices(std::shared_ptr<arrow::Array> in,
std::vector<std::shared_ptr<ArrayItemIndexS>>* out) {
std::vector<std::shared_ptr<ArrayItemIndexS>> v;
std::shared_ptr<arrow::UInt64Array> selected =
std::dynamic_pointer_cast<arrow::UInt64Array>(in);
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
v.push_back(std::make_shared<ArrayItemIndexS>(array_id, id));
}
*out = v;
return arrow::Status::OK();
}

arrow::Status WindowRankKernel::SortToIndicesPrepare(std::vector<ArrayList> values) {
for (auto each_batch : values) {
RETURN_NOT_OK(sorter_->Evaluate(each_batch));
}
return arrow::Status::OK();
// todo sort algorithm
}

arrow::Status WindowRankKernel::SortToIndicesFinish(
std::vector<std::shared_ptr<ArrayItemIndexS>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndexS>>* offsets) {
std::shared_ptr<arrow::Array> in;
std::shared_ptr<arrow::Array> out;
RETURN_NOT_OK(EncodeIndices(elements_to_sort, &in));
RETURN_NOT_OK(sorter_->Finish(in, &out));
std::vector<std::shared_ptr<ArrayItemIndexS>> decoded_out;
RETURN_NOT_OK(DecodeIndices(out, &decoded_out));
*offsets = decoded_out;
return arrow::Status::OK();
// todo sort algorithm
}

template <typename ArrayType>
arrow::Status WindowRankKernel::AreTheSameValue(const std::vector<ArrayList>& values,
int column,
std::shared_ptr<ArrayItemIndexS> i,
std::shared_ptr<ArrayItemIndexS> j,
bool* out) {
if (is_row_number_) {
*out = false;
return arrow::Status::OK();
}
auto typed_array_i =
std::dynamic_pointer_cast<ArrayType>(values.at(i->array_id).at(column));
auto typed_array_j =
std::dynamic_pointer_cast<ArrayType>(values.at(j->array_id).at(column));
*out = (typed_array_i->GetView(i->id) == typed_array_j->GetView(j->id));
return arrow::Status::OK();
}

WindowLagKernel::WindowLagKernel(
arrow::compute::ExecContext* ctx,
std::vector<std::shared_ptr<arrow::DataType>> type_list,
Expand Down Expand Up @@ -555,6 +621,18 @@ arrow::Status WindowLagKernel::Make(
return arrow::Status::OK();
}

// The interfaces for string type and non-string type are different. So we implemented the
// below two functions which will be passed as argument in HandleSortedPartition.
template<typename ArrayType, typename CType>
CType get_string_value(std::shared_ptr<ArrayType> array, uint32_t index) {
return array->GetString(index);
}

template<typename ArrayType, typename CType>
CType get_nonstring_value(std::shared_ptr<ArrayType> array, uint32_t index) {
return array->GetView(index);
}

arrow::Status WindowLagKernel::Finish(ArrayList* out) {
std::vector<ArrayList> values; // The window function input.
std::vector<ArrayList> sort_values; // Sort input.
Expand Down Expand Up @@ -671,16 +749,17 @@ arrow::Status WindowLagKernel::Finish(ArrayList* out) {
case VALUE_TYPE::type_id: { \
using CType = typename arrow::TypeTraits<VALUE_TYPE>::CType; \
RETURN_NOT_OK((HandleSortedPartition<VALUE_TYPE, CType, BUILDER_TYPE, ARRAY_TYPE>( \
values, group_ids, max_group_id, sorted_partitions, out))); \
values, group_ids, max_group_id, sorted_partitions, out, \
get_nonstring_value<ARRAY_TYPE, CType>))); \
} break;
PROCESS_SUPPORTED_COMMON_TYPES_LAG(PROCESS)
#undef PROCESS
#undef PROCESS_SUPPORTED_COMMON_TYPES_LAG
// case arrow::StringType::type_id: {
// RETURN_NOT_OK((HandleSortedPartition<arrow::StringType, std::string,
// arrow::StringBuilder, arrow::StringArray>(
// values, group_ids, max_group_id, sorted_partitions, out)));
// } break;
case arrow::StringType::type_id: {
RETURN_NOT_OK((HandleSortedPartition<arrow::StringType, std::string,
arrow::StringBuilder, arrow::StringArray>(
values, group_ids, max_group_id, sorted_partitions, out, get_string_value<arrow::StringArray, std::string>)));
} break;
default: {
return arrow::Status::Invalid("window function: unsupported input type: " +
value_type->name());
Expand All @@ -689,12 +768,12 @@ arrow::Status WindowLagKernel::Finish(ArrayList* out) {
return arrow::Status::OK();
}

template <typename VALUE_TYPE, typename CType, typename BuilderType, typename ArrayType>
template <typename VALUE_TYPE, typename CType, typename BuilderType, typename ArrayType, typename OP>
arrow::Status WindowLagKernel::HandleSortedPartition(
std::vector<ArrayList>& values,
std::vector<std::shared_ptr<arrow::Int32Array>>& group_ids, int32_t max_group_id,
std::vector<std::vector<std::shared_ptr<ArrayItemIndexS>>>& sorted_partitions,
ArrayList* out) {
ArrayList* out, OP op) {
CType** lag_array = new CType*[group_ids.size()];
for (int i = 0; i < group_ids.size(); i++) {
*(lag_array + i) = new CType[group_ids.at(i)->length()];
Expand Down Expand Up @@ -736,7 +815,7 @@ arrow::Status WindowLagKernel::HandleSortedPartition(
validity[index->array_id][index->id] = false;
} else {
lag_array[index->array_id][index->id] =
typed_array->GetView(offset_index->id);
op(typed_array, offset_index->id);
validity[index->array_id][index->id] = true;
}
}
Expand Down Expand Up @@ -774,72 +853,6 @@ arrow::Status WindowLagKernel::HandleSortedPartition(
return arrow::Status::OK();
}

static arrow::Status EncodeIndices(std::vector<std::shared_ptr<ArrayItemIndexS>> in,
std::shared_ptr<arrow::Array>* out) {
arrow::UInt64Builder builder;
for (const auto& each : in) {
uint64_t encoded = ((uint64_t)(each->array_id) << 32U) ^ ((uint64_t)(each->id));
RETURN_NOT_OK(builder.Append(encoded));
}
RETURN_NOT_OK(builder.Finish(out));
return arrow::Status::OK();
}

static arrow::Status DecodeIndices(std::shared_ptr<arrow::Array> in,
std::vector<std::shared_ptr<ArrayItemIndexS>>* out) {
std::vector<std::shared_ptr<ArrayItemIndexS>> v;
std::shared_ptr<arrow::UInt64Array> selected =
std::dynamic_pointer_cast<arrow::UInt64Array>(in);
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
v.push_back(std::make_shared<ArrayItemIndexS>(array_id, id));
}
*out = v;
return arrow::Status::OK();
}

arrow::Status WindowRankKernel::SortToIndicesPrepare(std::vector<ArrayList> values) {
for (auto each_batch : values) {
RETURN_NOT_OK(sorter_->Evaluate(each_batch));
}
return arrow::Status::OK();
// todo sort algorithm
}

arrow::Status WindowRankKernel::SortToIndicesFinish(
std::vector<std::shared_ptr<ArrayItemIndexS>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndexS>>* offsets) {
std::shared_ptr<arrow::Array> in;
std::shared_ptr<arrow::Array> out;
RETURN_NOT_OK(EncodeIndices(elements_to_sort, &in));
RETURN_NOT_OK(sorter_->Finish(in, &out));
std::vector<std::shared_ptr<ArrayItemIndexS>> decoded_out;
RETURN_NOT_OK(DecodeIndices(out, &decoded_out));
*offsets = decoded_out;
return arrow::Status::OK();
// todo sort algorithm
}

template <typename ArrayType>
arrow::Status WindowRankKernel::AreTheSameValue(const std::vector<ArrayList>& values,
int column,
std::shared_ptr<ArrayItemIndexS> i,
std::shared_ptr<ArrayItemIndexS> j,
bool* out) {
if (is_row_number_) {
*out = false;
return arrow::Status::OK();
}
auto typed_array_i =
std::dynamic_pointer_cast<ArrayType>(values.at(i->array_id).at(column));
auto typed_array_j =
std::dynamic_pointer_cast<ArrayType>(values.at(j->array_id).at(column));
*out = (typed_array_i->GetView(i->id) == typed_array_j->GetView(j->id));
return arrow::Status::OK();
}

#undef PROCESS_SUPPORTED_TYPES_WINDOW

} // namespace extra
Expand Down

0 comments on commit a1afd81

Please sign in to comment.