Skip to content

Commit

Permalink
modify gtests
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxin9014 committed Mar 7, 2023
1 parent 16bf061 commit 8c4f02a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ if (ENABLE_TESTS)
)
target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_compile_definitions(gtests_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_compile_definitions(dbms PUBLIC DBMS_PUBLIC_GTEST)
target_compile_definitions(dbms PUBLIC MULTIPLE_CONTEXT_GTEST)

target_link_libraries(gtests_dbms test_util_gtest_main tiflash_functions tiflash-dttool-lib delta_merge)
Expand Down
45 changes: 19 additions & 26 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ try
// for spill to disk tests
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(10000)));
ASSERT_THROW(executeAndAssertColumnsEqual(request, expected_cols[i * simple_test_num + j], {1}), Exception);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(25000)));
executeAndAssertColumnsEqual(request, expected_cols[i * simple_test_num + j], {2});
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(75000)));
executeAndAssertColumnsEqual(request, expected_cols[i * simple_test_num + j], {5});
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(170000)));
executeAndAssertColumnsEqual(request, expected_cols[i * simple_test_num + j], {10});
executeAndAssertColumnsEqual(request, expected_cols[i * simple_test_num + j], {2, 5, 10});
}
}
}
Expand Down Expand Up @@ -752,13 +747,15 @@ try
.join(context.scan("split_test", "t2"), tipb::JoinType::TypeInnerJoin, {col("a")})
.build(context);

auto join_restore_concurrences = {-1, 0, 1, 5};
const ColumnsWithTypeAndName expect = {toNullableVec<Int32>({1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 0, 0, 0}), toNullableVec<Int32>({2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}), toNullableVec<Int32>({1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 0, 0, 0})};
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(10000)));
ASSERT_THROW(executeAndAssertColumnsEqual(request, expect, {1}, {DEFAULT_BLOCK_SIZE}), Exception);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(50000)));
executeAndAssertColumnsEqual(request, expect, {2, 5}, {1, 2, 5, DEFAULT_BLOCK_SIZE});
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(120000)));
executeAndAssertColumnsEqual(request, expect, {10}, {1, 2, 5, DEFAULT_BLOCK_SIZE});
for (const auto & join_restore_concurrency : join_restore_concurrences)
{
context.context.setSetting("join_restore_concurrency", Field(static_cast<UInt64>(join_restore_concurrency)));
ASSERT_THROW(executeAndAssertColumnsEqual(request, expect, {1}, {DEFAULT_BLOCK_SIZE}), Exception);
executeAndAssertColumnsEqual(request, expect, {2, 5, 10}, {1, 2, 5, DEFAULT_BLOCK_SIZE});
}
}
CATCH

Expand All @@ -771,8 +768,8 @@ try
ColumnsWithTypeAndName left_column_data;
ColumnsWithTypeAndName right_column_data;
ColumnsWithTypeAndName common_column_data;
size_t table_rows = 102400;
size_t common_rows = 20480;
size_t table_rows = 51200;
size_t common_rows = 10240;
UInt64 max_block_size = 800;
size_t original_max_streams = 20;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
Expand Down Expand Up @@ -824,10 +821,9 @@ try
context.addExchangeReceiver("right_exchange_receiver_5_concurrency", right_column_infos, right_column_data, 5, right_partition_column_infos);
context.addExchangeReceiver("right_exchange_receiver_10_concurrency", right_column_infos, right_column_data, 10, right_partition_column_infos);
std::vector<String> left_table_names = {"left_table_1_concurrency", "left_table_3_concurrency", "left_table_5_concurrency", "left_table_10_concurrency"};
std::vector<String> right_table_names = {"right_table_3_concurrency", "right_table_5_concurrency", "right_table_10_concurrency"};
std::vector<String> right_table_names = {"right_table_1_concurrency", "right_table_3_concurrency", "right_table_5_concurrency", "right_table_10_concurrency"};
std::vector<size_t> right_exchange_receiver_concurrency = {1, 3, 5, 10};
UInt64 max_bytes_before_external_join = 23292240;

UInt64 max_bytes_before_external_join = 20000;
/// case 1, right join without right condition
auto request = context
.scan("outer_join_test", right_table_names[0])
Expand All @@ -852,11 +848,9 @@ try
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (right_table_name == "right_table_1_concurrency" && left_table_name == "left_table_1_concurrency")
if (right_table_name == "right_table_1_concurrency")
{
max_bytes_before_external_join = 100000;
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
max_bytes_before_external_join = 23292240;
}
else
{
Expand All @@ -878,14 +872,13 @@ try
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);

// test spill to disk
if (exchange_concurrency == 1 && left_table_name == "left_table_1_concurrency")
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (exchange_concurrency == 1)
{
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(100000)));
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
}
else
{
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
}
}
Expand All @@ -898,6 +891,7 @@ try
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use right_table left join left_table as the reference
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
ref_columns = executeStreams(request, original_max_streams);
/// case 2.1 table scan join table scan
for (auto & left_table_name : left_table_names)
Expand All @@ -914,7 +908,7 @@ try

// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (right_table_name == "right_table_1_concurrency" && left_table_name == "left_table_1_concurrency")
if (right_table_name == "right_table_1_concurrency")
{
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
}
Expand All @@ -939,14 +933,13 @@ try
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);

// test spill to disk
if (exchange_concurrency == 1 && left_table_name == "left_table_1_concurrency")
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (exchange_concurrency == 1)
{
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(100000)));
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
}
else
{
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
}
}
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1099,13 +1099,18 @@ void Join::insertFromBlock(const Block & block, size_t stream_index)
}
if likely (map_size_after_insert > map_size_before_insert)
{
join_partition->memory_usage += (pool_size_after_insert - pool_size_before_insert);
join_partition->memory_usage += (map_size_after_insert - map_size_before_insert);
}
continue;
}
}
build_spiller->spillBlocks(blocks_to_spill, i);
}
#ifdef DBMS_PUBLIC_GTEST
// for join spill to disk gtest
if (restore_round == 1)
return;
#endif
spillMostMemoryUsedPartitionIfNeed();
}
}
Expand Down

0 comments on commit 8c4f02a

Please sign in to comment.