Skip to content

Commit

Permalink
remove sort limit
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Jul 20, 2022
1 parent 697e0de commit 6b5738a
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ struct ArrayItemIndex {
ArrayItemIndex() : array_id(0), id(0) {}
ArrayItemIndex(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};
struct ArrayItemIndexS {
uint16_t id = 0;
uint16_t array_id = 0;
ArrayItemIndexS() : array_id(0), id(0) {}
ArrayItemIndexS(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};

} // namespace extra
} // namespace arrowcompute
} // namespace codegen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TakerBase {
return arrow::Status::NotImplemented("TakerBase Finish is abstract.");
}

virtual arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
virtual arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
return arrow::Status::NotImplemented("TakerBase TakeFromIndices is abstract.");
}
Expand Down Expand Up @@ -101,7 +101,7 @@ class ArrayTaker<DataType, CType, enable_if_number_or_date<DataType>> : public T
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
arrow::ArrayData out_data;
out_data.length = length;
Expand Down Expand Up @@ -181,7 +181,7 @@ class ArrayTaker<DataType, CType, arrow::enable_if_boolean<DataType>> : public T
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
arrow::ArrayData out_data;
out_data.length = length;
Expand Down Expand Up @@ -266,7 +266,7 @@ class ArrayTaker<DataType, CType, enable_if_decimal<DataType>> : public TakerBas
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
arrow::ArrayData out_data;
out_data.length = length;
Expand Down Expand Up @@ -352,7 +352,7 @@ class ArrayTaker<DataType, CType, arrow::enable_if_same<DataType, arrow::StringT
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
for (int64_t position = 0; position < length; position++) {
auto item = indices_begin + position;
Expand Down Expand Up @@ -410,7 +410,7 @@ class ComplexArrayTaker : public TakerBase {
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
for (int64_t position = 0; position < length; position++) {
auto item = indices_begin + position;
Expand Down Expand Up @@ -458,7 +458,7 @@ class ArrayTaker<DataType, CType, enable_if_timestamp<DataType>> : public TakerB
return arrow::Status::OK();
}

arrow::Status TakeFromIndices(ArrayItemIndexS* indices_begin, int64_t length,
arrow::Status TakeFromIndices(ArrayItemIndex* indices_begin, int64_t length,
std::shared_ptr<arrow::Array>* out) {
arrow::ArrayData out_data;
out_data.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ class ConditionedMergeJoinKernel::Impl {
auto function_name = "ConditionCheck_" + std::to_string(relation_id_[0]);
if (use_relation_for_stream) {
function_define_ss << "inline bool " << function_name
<< "(ArrayItemIndexS idx_0, ArrayItemIndexS idx_1) {"
<< "(ArrayItemIndex idx_0, ArrayItemIndex idx_1) {"
<< std::endl;
} else {
function_define_ss << "inline bool " << function_name
<< "(ArrayItemIndexS idx_0) {" << std::endl;
<< "(ArrayItemIndex idx_0) {" << std::endl;
}
function_define_ss << condition_node_visitor->GetPrepare() << std::endl;
function_define_ss << "return " << condition_node_visitor->GetResult() << ";"
Expand Down Expand Up @@ -346,11 +346,11 @@ class ConditionedMergeJoinKernel::Impl {
<< "->GetItemIndexWithShift(" << streamed_range_id << ");"
<< std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << left_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
if (cache_right) {
codes_ss << right_for_loop_codes.str();
Expand Down Expand Up @@ -435,11 +435,11 @@ class ConditionedMergeJoinKernel::Impl {
<< "->GetItemIndexWithShift(" << streamed_range_id << ");"
<< std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << left_index_name << ";" << std::endl;
prepare_ss << "bool " << fill_null_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
if (cache_right) {
Expand Down Expand Up @@ -538,11 +538,11 @@ class ConditionedMergeJoinKernel::Impl {
codes_ss << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");" << std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << left_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
codes_ss << "for (int " << range_id << " = 0; " << range_id << " < 1;" << range_id
<< "++) {" << std::endl;
Expand Down Expand Up @@ -631,11 +631,11 @@ class ConditionedMergeJoinKernel::Impl {
codes_ss << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");" << std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << left_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
codes_ss << "for (int " << range_id << " = 0; " << range_id << " < 1;" << range_id
<< "++) {" << std::endl;
Expand Down Expand Up @@ -720,11 +720,11 @@ class ConditionedMergeJoinKernel::Impl {
codes_ss << right_index_name << " = " << streamed_relation
<< "->GetItemIndexWithShift(" << streamed_range_id << ");" << std::endl;
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << right_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << right_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
}
std::stringstream prepare_ss;
prepare_ss << "ArrayItemIndexS " << left_index_name << ";" << std::endl;
prepare_ss << "ArrayItemIndex " << left_index_name << ";" << std::endl;
(*output)->definition_codes += prepare_ss.str();
codes_ss << "for (int " << range_id << " = 0; " << range_id << " < 1;" << range_id
<< "++) {" << std::endl;
Expand Down
70 changes: 35 additions & 35 deletions native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class SortArraysToIndicesKernel::Impl {
"SorterResultIterator received cache with " + std::to_string(cached.size()) +
", which does not match expected num_cols of " + std::to_string(col_num_));
}
indices_begin_ = (ArrayItemIndexS*)indices_in->value_data();
indices_begin_ = (ArrayItemIndex*)indices_in->value_data();
for (int i = 0; i < col_num_; i++) {
auto field = schema->field(i);
std::shared_ptr<TakerBase> taker;
Expand Down Expand Up @@ -337,7 +337,7 @@ class SortArraysToIndicesKernel::Impl {
arrow::compute::ExecContext* ctx_;
uint64_t batch_size_;
int col_num_;
ArrayItemIndexS* indices_begin_;
ArrayItemIndex* indices_begin_;
std::vector<std::shared_ptr<arrow::DataType>> type_list_;
std::vector<std::shared_ptr<TakerBase>> taker_list_;
std::shared_ptr<FixedSizeBinaryArray> indices_in_cache_;
Expand Down Expand Up @@ -1392,7 +1392,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
return arrow::Status::OK();
}

void PartitionNulls(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) {
void PartitionNulls(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end) {
int64_t indices_i = 0;
int64_t indices_null = 0;

Expand Down Expand Up @@ -1456,7 +1456,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}
}

int64_t PartitionNaNs(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) {
int64_t PartitionNaNs(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end) {
int64_t indices_i = 0;
int64_t indices_nan = 0;

Expand Down Expand Up @@ -1529,7 +1529,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}

template <typename T>
auto Partition(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end,
auto Partition(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end,
int64_t& num_nan) ->
typename std::enable_if_t<arrow::is_floating_type<T>::value> {
PartitionNulls(indices_begin, indices_end);
Expand All @@ -1539,14 +1539,14 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}

template <typename T>
auto Partition(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end,
auto Partition(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end,
int64_t& num_nan) ->
typename std::enable_if_t<!arrow::is_floating_type<T>::value> {
PartitionNulls(indices_begin, indices_end);
}

template <typename T>
auto Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end, int64_t num_nan)
auto Sort(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end, int64_t num_nan)
-> typename std::enable_if_t<is_number_bool_date<T>::value> {
if (asc_) {
if (nulls_first_) {
Expand All @@ -1561,7 +1561,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
});
}
} else {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) >
cached_key_[y.array_id]->GetView(y.id);
};
Expand All @@ -1576,10 +1576,10 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}

template <typename T>
auto Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end, int64_t num_nan)
auto Sort(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end, int64_t num_nan)
-> typename std::enable_if_t<std::is_same<T, arrow::StringType>::value> {
if (asc_) {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) <
cached_key_[y.array_id]->GetView(y.id);
};
Expand All @@ -1589,7 +1589,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
gfx::timsort(indices_begin, indices_begin + items_total_ - nulls_total_, comp);
}
} else {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) >
cached_key_[y.array_id]->GetView(y.id);
};
Expand All @@ -1602,10 +1602,10 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}

template <typename T>
auto Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end, int64_t num_nan)
auto Sort(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end, int64_t num_nan)
-> typename std::enable_if_t<arrow::is_decimal_type<T>::value> {
if (asc_) {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) <
cached_key_[y.array_id]->GetView(y.id);
};
Expand All @@ -1617,7 +1617,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
comp);
}
} else {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) >
cached_key_[y.array_id]->GetView(y.id);
};
Expand All @@ -1634,19 +1634,19 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
arrow::Status FinishInternal(std::shared_ptr<FixedSizeBinaryArray>* out) {
// initiate buffer for all arrays
std::shared_ptr<arrow::Buffer> indices_buf;
int64_t buf_size = items_total_ * sizeof(ArrayItemIndexS);
int64_t buf_size = items_total_ * sizeof(ArrayItemIndex);
auto maybe_buffer = arrow::AllocateBuffer(buf_size, ctx_->memory_pool());
indices_buf = *std::move(maybe_buffer);
ArrayItemIndexS* indices_begin =
reinterpret_cast<ArrayItemIndexS*>(indices_buf->mutable_data());
ArrayItemIndexS* indices_end = indices_begin + items_total_;
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total_;
// do partition and sort here
int64_t num_nan = 0;
Partition<DATATYPE>(indices_begin, indices_end, num_nan);
Sort<DATATYPE>(indices_begin, indices_end, num_nan);
std::shared_ptr<arrow::FixedSizeBinaryType> out_type;
RETURN_NOT_OK(
MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type));
MakeFixedSizeBinaryType(sizeof(ArrayItemIndex) / sizeof(int32_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out));

return arrow::Status::OK();
Expand Down Expand Up @@ -2021,13 +2021,13 @@ class TypedSorterImpl : public CodeGenBase {
R"(
// initiate buffer for all arrays
std::shared_ptr<arrow::Buffer> indices_buf;
int64_t buf_size = items_total_ * sizeof(ArrayItemIndexS);
int64_t buf_size = items_total_ * sizeof(ArrayItemIndex);
auto maybe_buffer = arrow::AllocateBuffer(buf_size, ctx_->memory_pool());
indices_buf = *std::move(maybe_buffer);
ArrayItemIndexS* indices_begin =
reinterpret_cast<ArrayItemIndexS*>(indices_buf->mutable_data());
ArrayItemIndexS* indices_end = indices_begin + items_total_;
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total_;
int64_t indices_i = 0;
ARROW_CHECK_LE(num_batches_, 64 * 1024);
Expand All @@ -2043,7 +2043,7 @@ class TypedSorterImpl : public CodeGenBase {
)" + sort_func_str +
R"(
std::shared_ptr<arrow::FixedSizeBinaryType> out_type;
RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndex) / sizeof(int32_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out));
return arrow::Status::OK();
}
Expand Down Expand Up @@ -2110,13 +2110,13 @@ extern "C" void MakeCodeGen(arrow::compute::ExecContext* ctx,
projected = false;
}
if (has_null) {
ss << "auto comp = [this](const ArrayItemIndexS& x, const "
"ArrayItemIndexS& y) {"
ss << "auto comp = [this](const ArrayItemIndex& x, const "
"ArrayItemIndex& y) {"
<< GetCompFunction_(0, projected, key_field_list_, projected_types_,
sort_directions_, nulls_order_);
} else {
ss << "auto comp_without_null = "
<< "[this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {"
<< "[this](const ArrayItemIndex& x, const ArrayItemIndex& y) {"
<< GetCompFunction_Without_Null_(0, projected, key_field_list_, projected_types_,
sort_directions_);
}
Expand Down Expand Up @@ -2579,15 +2579,15 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl {
return arrow::Status::OK();
}

void Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) {
void Sort(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end) {
int keys_num = sort_directions_.size();
auto comp = [this, &keys_num](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
auto comp = [this, &keys_num](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return compareRow(x.array_id, x.id, y.array_id, y.id, keys_num);
};
gfx::timsort(indices_begin, indices_begin + items_total_, comp);
}

void Partition(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) {
void Partition(ArrayItemIndex* indices_begin, ArrayItemIndex* indices_end) {
int64_t indices_i = 0;
int64_t indices_null = 0;
ARROW_CHECK_LE(num_batches_, 64 * 1024);
Expand All @@ -2604,12 +2604,12 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl {
arrow::Status FinishInternal(std::shared_ptr<FixedSizeBinaryArray>* out) {
// initiate buffer for all arrays
std::shared_ptr<arrow::Buffer> indices_buf;
int64_t buf_size = items_total_ * sizeof(ArrayItemIndexS);
int64_t buf_size = items_total_ * sizeof(ArrayItemIndex);
auto maybe_buffer = arrow::AllocateBuffer(buf_size, ctx_->memory_pool());
indices_buf = *std::move(maybe_buffer);
ArrayItemIndexS* indices_begin =
reinterpret_cast<ArrayItemIndexS*>(indices_buf->mutable_data());
ArrayItemIndexS* indices_end = indices_begin + items_total_;
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total_;
// do partition and sort here
Partition(indices_begin, indices_end);
if (key_projector_) {
Expand All @@ -2627,7 +2627,7 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl {
Sort(indices_begin, indices_end);
std::shared_ptr<arrow::FixedSizeBinaryType> out_type;
RETURN_NOT_OK(
MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type));
MakeFixedSizeBinaryType(sizeof(ArrayItemIndex) / sizeof(int32_t), &out_type));
RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out));
return arrow::Status::OK();
}
Expand Down
Loading

0 comments on commit 6b5738a

Please sign in to comment.