Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fix window sort
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Aug 8, 2022
1 parent 3cdd623 commit 2e300a6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ class WindowRankKernel : public KernalBase {

arrow::Status SortToIndicesPrepare(std::vector<ArrayList> values);
arrow::Status SortToIndicesFinish(
std::vector<std::shared_ptr<ArrayItemIndex>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndex>>* offsets);
std::vector<std::shared_ptr<ArrayItemIndexS>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndexS>>* offsets);

template <typename ArrayType>
arrow::Status AreTheSameValue(const std::vector<ArrayList>& values, int column,
std::shared_ptr<ArrayItemIndex> i,
std::shared_ptr<ArrayItemIndex> j, bool* out);
std::shared_ptr<ArrayItemIndexS> i,
std::shared_ptr<ArrayItemIndexS> j, bool* out);

private:
std::shared_ptr<WindowSortKernel::Impl> sorter_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) {
#endif

// initialize partitions to be sorted
std::vector<std::vector<std::shared_ptr<ArrayItemIndex>>> partitions_to_sort;
std::vector<std::vector<std::shared_ptr<ArrayItemIndexS>>> partitions_to_sort;
for (int i = 0; i <= max_group_id; i++) {
partitions_to_sort.emplace_back();
}
Expand All @@ -376,18 +376,18 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) {
}
uint64_t partition_id = slice->GetView(j);
partitions_to_sort.at(partition_id)
.push_back(std::make_shared<ArrayItemIndex>(i, j));
.push_back(std::make_shared<ArrayItemIndexS>(i, j));
}
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif

std::vector<std::vector<std::shared_ptr<ArrayItemIndex>>> sorted_partitions;
std::vector<std::vector<std::shared_ptr<ArrayItemIndexS>>> sorted_partitions;
RETURN_NOT_OK(SortToIndicesPrepare(values));
for (int i = 0; i <= max_group_id; i++) {
std::vector<std::shared_ptr<ArrayItemIndex>> partition = partitions_to_sort.at(i);
std::vector<std::shared_ptr<ArrayItemIndex>> sorted_partition;
std::vector<std::shared_ptr<ArrayItemIndexS>> partition = partitions_to_sort.at(i);
std::vector<std::shared_ptr<ArrayItemIndexS>> sorted_partition;
#ifdef DEBUG
std::cout << "[window kernel] Sorting a single partition... " << std::endl;
#endif
Expand All @@ -406,17 +406,17 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) {
std::cout << "[window kernel] Generating rank result on a single partition... "
<< std::endl;
#endif
std::vector<std::shared_ptr<ArrayItemIndex>> sorted_partition =
std::vector<std::shared_ptr<ArrayItemIndexS>> sorted_partition =
sorted_partitions.at(i);
int assumed_rank = 0;
for (int j = 0; j < sorted_partition.size(); j++) {
++assumed_rank; // rank value starts from 1
std::shared_ptr<ArrayItemIndex> index = sorted_partition.at(j);
std::shared_ptr<ArrayItemIndexS> index = sorted_partition.at(j);
if (j == 0) {
rank_array[index->array_id][index->id] = 1; // rank value starts from 1
continue;
}
std::shared_ptr<ArrayItemIndex> last_index = sorted_partition.at(j - 1);
std::shared_ptr<ArrayItemIndexS> last_index = sorted_partition.at(j - 1);
bool same = true;
for (int column_id = 0; column_id < type_list_.size(); column_id++) {
bool s = false;
Expand Down Expand Up @@ -478,27 +478,27 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) {
return arrow::Status::OK();
}

static arrow::Status EncodeIndices(std::vector<std::shared_ptr<ArrayItemIndex>> in,
static arrow::Status EncodeIndices(std::vector<std::shared_ptr<ArrayItemIndexS>> in,
std::shared_ptr<arrow::Array>* out) {
arrow::UInt64Builder builder;
for (const auto& each : in) {
uint64_t encoded = ((uint64_t)(each->array_id) << 16U) ^ ((uint64_t)(each->id));
uint64_t encoded = ((uint64_t)(each->array_id) << 32U) ^ ((uint64_t)(each->id));
RETURN_NOT_OK(builder.Append(encoded));
}
RETURN_NOT_OK(builder.Finish(out));
return arrow::Status::OK();
}

static arrow::Status DecodeIndices(std::shared_ptr<arrow::Array> in,
std::vector<std::shared_ptr<ArrayItemIndex>>* out) {
std::vector<std::shared_ptr<ArrayItemIndex>> v;
std::vector<std::shared_ptr<ArrayItemIndexS>>* out) {
std::vector<std::shared_ptr<ArrayItemIndexS>> v;
std::shared_ptr<arrow::UInt64Array> selected =
std::dynamic_pointer_cast<arrow::UInt64Array>(in);
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U;
uint16_t id = encoded & 0xFFFFU;
v.push_back(std::make_shared<ArrayItemIndex>(array_id, id));
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
v.push_back(std::make_shared<ArrayItemIndexS>(array_id, id));
}
*out = v;
return arrow::Status::OK();
Expand All @@ -513,13 +513,13 @@ arrow::Status WindowRankKernel::SortToIndicesPrepare(std::vector<ArrayList> valu
}

arrow::Status WindowRankKernel::SortToIndicesFinish(
std::vector<std::shared_ptr<ArrayItemIndex>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndex>>* offsets) {
std::vector<std::shared_ptr<ArrayItemIndexS>> elements_to_sort,
std::vector<std::shared_ptr<ArrayItemIndexS>>* offsets) {
std::shared_ptr<arrow::Array> in;
std::shared_ptr<arrow::Array> out;
RETURN_NOT_OK(EncodeIndices(elements_to_sort, &in));
RETURN_NOT_OK(sorter_->Finish(in, &out));
std::vector<std::shared_ptr<ArrayItemIndex>> decoded_out;
std::vector<std::shared_ptr<ArrayItemIndexS>> decoded_out;
RETURN_NOT_OK(DecodeIndices(out, &decoded_out));
*offsets = decoded_out;
return arrow::Status::OK();
Expand All @@ -529,8 +529,8 @@ arrow::Status WindowRankKernel::SortToIndicesFinish(
template <typename ArrayType>
arrow::Status WindowRankKernel::AreTheSameValue(const std::vector<ArrayList>& values,
int column,
std::shared_ptr<ArrayItemIndex> i,
std::shared_ptr<ArrayItemIndex> j,
std::shared_ptr<ArrayItemIndexS> i,
std::shared_ptr<ArrayItemIndexS> j,
bool* out) {
if (is_row_number_) {
*out = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,16 @@ class TypedSorterImpl : public CodeGenBase {
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U;
uint16_t id = encoded & 0xFFFFU;
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
(indices_begin + indices_i)->array_id = array_id;
(indices_begin + indices_i)->id = id;
indices_i++;
}
)" + sort_func_str +
R"(
std::shared_ptr<arrow::FixedSizeBinaryType> out_type;
RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int64_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total, indices_buf, out));
return arrow::Status::OK();
}
Expand All @@ -273,7 +273,7 @@ class TypedSorterImpl : public CodeGenBase {
arrow::UInt64Builder builder;
auto *index = (ArrayItemIndexS *) indices_out->value_data();
for (int i = 0; i < indices_out->length(); i++) {
uint64_t encoded = ((uint64_t) (index->array_id) << 16U) ^ ((uint64_t) (index->id));
uint64_t encoded = ((uint64_t) (index->array_id) << 32U) ^ ((uint64_t) (index->id));
RETURN_NOT_OK(builder.Append(encoded));
index++;
}
Expand Down Expand Up @@ -538,8 +538,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl {
std::dynamic_pointer_cast<arrow::UInt64Array>(in);
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U;
uint16_t id = encoded & 0xFFFFU;
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
auto key_clip = cached_key_.at(array_id);
if (key_clip->IsNull(id)) {
nulls_total++;
Expand All @@ -561,8 +561,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl {
// we should also support desc and asc here
for (int i = 0; i < selected->length(); i++) {
uint64_t encoded = selected->GetView(i);
uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U;
uint16_t id = encoded & 0xFFFFU;
uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U;
uint32_t id = encoded & 0xFFFFFFFFU;
auto key_clip = cached_key_.at(array_id);
if (nulls_first_) {
if (!key_clip->IsNull(id)) {
Expand Down Expand Up @@ -621,7 +621,7 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl {
arrow::UInt64Builder builder;
auto* index = (ArrayItemIndexS*)indices_out->value_data();
for (int i = 0; i < indices_out->length(); i++) {
uint64_t encoded = ((uint64_t)(index->array_id) << 16U) ^ ((uint64_t)(index->id));
uint64_t encoded = ((uint64_t)(index->array_id) << 32U) ^ ((uint64_t)(index->id));
RETURN_NOT_OK(builder.Append(encoded));
index++;
}
Expand Down

0 comments on commit 2e300a6

Please sign in to comment.