Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-999] use TimSort for STRING/DECIMAL onekey based sorting #1017

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
cmake .. -DNATIVE_AVX512=OFF -DBUILD_ARROW=0 -DTESTS=1
make
cd src
ctest -R
ctest -V

scala-unit-test-hadoop-2-7:
runs-on: ubuntu-latest
Expand Down
32 changes: 16 additions & 16 deletions native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1566,11 +1566,11 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
cached_key_[y.array_id]->GetView(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total_ + num_nan, indices_begin + items_total_,
comp);
gfx::timsort(indices_begin + nulls_total_ + num_nan, indices_begin + items_total_,
comp);
} else {
std::sort(indices_begin + num_nan, indices_begin + items_total_ - nulls_total_,
comp);
gfx::timsort(indices_begin + num_nan, indices_begin + items_total_ - nulls_total_,
comp);
}
}
}
Expand All @@ -1584,19 +1584,19 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
cached_key_[y.array_id]->GetString(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total_, indices_begin + items_total_, comp);
gfx::timsort(indices_begin + nulls_total_, indices_begin + items_total_, comp);
} else {
std::sort(indices_begin, indices_begin + items_total_ - nulls_total_, comp);
gfx::timsort(indices_begin, indices_begin + items_total_ - nulls_total_, comp);
}
} else {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
return cached_key_[x.array_id]->GetString(x.id) >
cached_key_[y.array_id]->GetString(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total_, indices_begin + items_total_, comp);
gfx::timsort(indices_begin + nulls_total_, indices_begin + items_total_, comp);
} else {
std::sort(indices_begin, indices_begin + items_total_ - nulls_total_, comp);
gfx::timsort(indices_begin, indices_begin + items_total_ - nulls_total_, comp);
}
}
}
Expand All @@ -1610,23 +1610,23 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
cached_key_[y.array_id]->GetView(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total_, indices_begin + items_total_ - num_nan,
comp);
gfx::timsort(indices_begin + nulls_total_, indices_begin + items_total_ - num_nan,
comp);
} else {
std::sort(indices_begin, indices_begin + items_total_ - nulls_total_ - num_nan,
comp);
gfx::timsort(indices_begin, indices_begin + items_total_ - nulls_total_ - num_nan,
comp);
}
} else {
auto comp = [this](const ArrayItemIndexS& x, const ArrayItemIndexS& y) {
return cached_key_[x.array_id]->GetView(x.id) >
cached_key_[y.array_id]->GetView(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total_ + num_nan, indices_begin + items_total_,
comp);
gfx::timsort(indices_begin + nulls_total_ + num_nan, indices_begin + items_total_,
comp);
} else {
std::sort(indices_begin + num_nan, indices_begin + items_total_ - nulls_total_,
comp);
gfx::timsort(indices_begin + num_nan, indices_begin + items_total_ - nulls_total_,
comp);
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions native-sql-engine/cpp/src/tests/arrow_compute_test_externalsort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace sparkcolumnarplugin {
namespace codegen {

TEST(TestArrowComputeSort, SortTestInplaceDouble) {
TEST(TestArrowComputeExternalSort, SortTestInplaceDouble) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", float64());
Expand Down Expand Up @@ -124,7 +124,7 @@ TEST(TestArrowComputeSort, SortTestInplaceDouble) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestInplaceDecimal) {
TEST(TestArrowComputeExternalSort, SortTestInplaceDecimal) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", decimal128(10, 4));
Expand Down Expand Up @@ -205,7 +205,7 @@ TEST(TestArrowComputeSort, SortTestInplaceDecimal) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestOneKey) {
TEST(TestArrowComputeExternalSort, SortTestOneKey) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", float64());
Expand Down Expand Up @@ -325,7 +325,7 @@ TEST(TestArrowComputeSort, SortTestOneKey) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestOneKeyDecimal) {
TEST(TestArrowComputeExternalSort, SortTestOneKeyDecimal) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", decimal128(10, 4));
Expand Down Expand Up @@ -420,7 +420,7 @@ TEST(TestArrowComputeSort, SortTestOneKeyDecimal) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestOneKeyStr) {
TEST(TestArrowComputeExternalSort, SortTestOneKeyStr) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", utf8());
Expand Down Expand Up @@ -488,7 +488,7 @@ TEST(TestArrowComputeSort, SortTestOneKeyStr) {
std::shared_ptr<arrow::RecordBatch> expected_result;
std::vector<std::string> expected_result_string = {
R"(["a","a","b","b","c","d","d","e","f","g","l","o","p","q","q","q","q","r","s","t","u","w","x","y","z",null,null,null,null,null,null,null,null,null,null])",
R"(["h","a","a","e",null,"f","f","f","c","h",null,"e","a","c","a","c","e",null,"e","f","h","c","f","g","e","g","g","j",null,"h","a","j","g","j","j"])"};
R"(["h","a","a","e",null,"f","f","f","c","h",null,"e","a","c","e","c","a",null,"e","f","h","c","f","g","e","g","g","j",null,"h","a","j","g","j","j"])"};
MakeInputBatch(expected_result_string, sch, &expected_result);
for (auto batch : input_batch_list) {
ASSERT_NOT_OK(sort_expr->evaluate(batch, &dummy_result_batches));
Expand All @@ -506,7 +506,7 @@ TEST(TestArrowComputeSort, SortTestOneKeyStr) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestOneKeyWithProjection) {
TEST(TestArrowComputeExternalSort, SortTestOneKeyWithProjection) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", utf8());
Expand Down Expand Up @@ -594,7 +594,7 @@ TEST(TestArrowComputeSort, SortTestOneKeyWithProjection) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestMultipleKeys) {
TEST(TestArrowComputeExternalSort, SortTestMultipleKeys) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", float32());
Expand Down Expand Up @@ -704,7 +704,7 @@ TEST(TestArrowComputeSort, SortTestMultipleKeys) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestMultipleKeysWithProjection) {
TEST(TestArrowComputeExternalSort, SortTestMultipleKeysWithProjection) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", uint32());
Expand Down Expand Up @@ -850,7 +850,7 @@ TEST(TestArrowComputeSort, SortTestMultipleKeysWithProjection) {
unsetenv("NATIVESQL_MAX_MEMORY_SIZE");
}

TEST(TestArrowComputeSort, SortTestMulKeyDecimalCodegen) {
TEST(TestArrowComputeExternalSort, SortTestMulKeyDecimalCodegen) {
setenv("NATIVESQL_MAX_MEMORY_SIZE", "300", 1);
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", decimal128(10, 4));
Expand Down
20 changes: 10 additions & 10 deletions native-sql-engine/cpp/src/tests/arrow_compute_test_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1269,9 +1269,9 @@ TEST(TestArrowComputeSort, SortTestOnekeyBooleanDesc) {
"false, false, false, false, false, false, false, false, false, false, "
"false, false, false, "
"false, false, false, false, false, false, false]",
"[18, 22, 8, 41, 1, 12, 12, 6, 4, 5, 2, 4, 6, 5, 2, 6, 10, 32, 78, 78, "
"11, 12, 12, 45, 2, "
"11, 16, 12, 1, 0, 6, 7, 4, 3, 2]"};
"[1, 5, 6, 4, 2, 4, 6, 12, 41, 8, 22, 12, 18, 5, 2, 2, 3, 4, 7, 6, "
"0, 1, 12, 16, 10, "
"11, 2, 45, 12, 78, 32, 6, 78, 11, 12]"};
MakeInputBatch(expected_result_string, sch, &expected_result);

for (auto batch : input_batch_list) {
Expand Down Expand Up @@ -1332,21 +1332,21 @@ TEST(TestArrowComputeSort, SortTestOneKeyStr) {
std::shared_ptr<ResultIteratorBase> sort_result_iterator_base;
std::vector<std::string> input_data_string = {
R"(["b", "q", "s", "t", null, null, "a"])",
R"(["a", "c", "e", "f", "g", null, "h"])"};
R"(["a", "1", "e", "f", "g", null, "h"])"};
MakeInputBatch(input_data_string, sch, &input_batch);
input_batch_list.push_back(input_batch);
std::vector<std::string> input_data_string_2 = {
R"([null, "f", "q", "d", "r", null, "g"])",
R"(["a", "c", "e", "f", null, "j", "h"])"};
R"(["a", "c", "2", "f", null, "j", "h"])"};
MakeInputBatch(input_data_string_2, sch, &input_batch);
input_batch_list.push_back(input_batch);
std::vector<std::string> input_data_string_3 = {
R"(["p", "q", "o", "e", null, null, "l"])",
R"(["a", "c", "e", "f", "g","j", null])"};
R"(["a", "3", "e", "f", "g","j", null])"};
MakeInputBatch(input_data_string_3, sch, &input_batch);
input_batch_list.push_back(input_batch);
std::vector<std::string> input_data_string_4 = {
R"(["q", "w", "z", "x", "y", null, "u"])", R"(["a", "c", "e", "f", "g","j", "h"])"};
R"(["q", "w", "z", "x", "y", null, "u"])", R"(["4", "c", "e", "f", "g","j", "h"])"};
MakeInputBatch(input_data_string_4, sch, &input_batch);
input_batch_list.push_back(input_batch);
std::vector<std::string> input_data_string_5 = {
Expand All @@ -1359,7 +1359,7 @@ TEST(TestArrowComputeSort, SortTestOneKeyStr) {
std::shared_ptr<arrow::RecordBatch> expected_result;
std::vector<std::string> expected_result_string = {
R"(["a","a","b","b","c","d","d","e","f","g","l","o","p","q","q","q","q","r","s","t","u","w","x","y","z",null,null,null,null,null,null,null,null,null,null])",
R"(["h","a","e","a",null,"f","f","f","c","h",null,"e","a","c","a","e","c",null,"e","f","h","c","f","g","e","g",null,"a","j","g","j","j","g","j","h"])"};
R"(["h","a","a","e",null,"f","f","f","c","h",null,"e","a","1","2","3","4",null,"e","f","h","c","f","g","e","g",null,"a","j","g","j","j","g","j","h"])"};
MakeInputBatch(expected_result_string, sch, &expected_result);
for (auto batch : input_batch_list) {
ASSERT_NOT_OK(sort_expr->evaluate(batch, &dummy_result_batches));
Expand Down Expand Up @@ -1444,8 +1444,8 @@ TEST(TestArrowComputeSort, SortTestOneKeyWithProjection) {
//////////////////////////////////////
std::shared_ptr<arrow::RecordBatch> expected_result;
std::vector<std::string> expected_result_string = {
R"(["a","a","b","B","C","D","d","E","F","g","l","o","p","q","q","Q","q","r","s","T","u","W","x","y","Z",null,null,null,null,null,null,null,null,null,null])",
R"(["h","a","e","a",null,"f","f","f","c","h",null,"e","a","c","a","e","c",null,"e","f","h","c","f","g","e","g",null,"a","j","g","j","j","g","j","h"])"};
R"(["a","a","B","b","C","d","D","E","F","g","l","o","p","q","Q","q","q","r","s","T","u","W","x","y","Z",null,null,null,null,null,null,null,null,null,null])",
R"(["h","a","a","e",null,"f","f","f","c","h",null,"e","a","c","e","c","a",null,"e","f","h","c","f","g","e","g",null,"a","j","g","j","j","g","j","h"])"};
MakeInputBatch(expected_result_string, sch, &expected_result);
for (auto batch : input_batch_list) {
ASSERT_NOT_OK(sort_expr->evaluate(batch, &dummy_result_batches));
Expand Down
36 changes: 14 additions & 22 deletions native-sql-engine/cpp/src/tests/jniutils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,22 @@ TEST_F(JniUtilsTest, TestRecordBatchConcatenate) {
batches.push_back(batch1);
batches.push_back(batch2);

for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10000; i++) {
int total_num_rows = batch1->num_rows() + batch2->num_rows();

// int num_columns = batches.at(0)->num_columns();
int num_columns = schema->num_fields();
arrow::ArrayVector arrayColumns;
for (jint i = 0; i < num_columns; i++) {
arrow::ArrayVector arrvec;
for (const auto& batch : batches) {
arrvec.push_back(batch->column(i));
}
std::shared_ptr<arrow::Array> bigArr;
ASSERT_OK_AND_ASSIGN(bigArr, Concatenate(arrvec, default_memory_pool()))
// ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool));
arrayColumns.push_back(bigArr);
}
auto out_batch = arrow::RecordBatch::Make(schema, total_num_rows, arrayColumns);

std::cout << "out_batch->num_rows():" << out_batch->num_rows() << std::endl;
int total_num_rows = batch1->num_rows() + batch2->num_rows();

// int num_columns = batches.at(0)->num_columns();
int num_columns = schema->num_fields();
arrow::ArrayVector arrayColumns;
for (jint i = 0; i < num_columns; i++) {
arrow::ArrayVector arrvec;
for (const auto& batch : batches) {
arrvec.push_back(batch->column(i));
}

sleep(3);
std::shared_ptr<arrow::Array> bigArr;
ASSERT_OK_AND_ASSIGN(bigArr, Concatenate(arrvec, default_memory_pool()))
// ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool));
arrayColumns.push_back(bigArr);
}
auto out_batch = arrow::RecordBatch::Make(schema, total_num_rows, arrayColumns);
}

TEST_F(JniUtilsTest, TestMakeRecordBatchBuild_Int_Struct) {
Expand Down