Skip to content

Commit

Permalink
revert gtests
Browse files Browse the repository at this point in the history
Signed-off-by: Zhigao Tong <[email protected]>
  • Loading branch information
solotzg committed Feb 2, 2023
1 parent 2b1e538 commit 415068c
Showing 1 changed file with 6 additions and 197 deletions.
203 changes: 6 additions & 197 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <TestUtils/ColumnGenerator.h>
#include <TestUtils/ExecutorTestUtils.h>

#include <ext/enumerate.h>
Expand Down Expand Up @@ -49,12 +48,12 @@ class JoinExecutorTestRunner : public DB::tests::ExecutorTest
toVec<String>("join_c", {"apple", "banana"})});

context.addExchangeReceiver("exchange_r_table",
{{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}},
{{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}},
{toNullableVec<String>("s", {"banana", "banana"}),
toNullableVec<String>("join_c", {"apple", "banana"})});

context.addExchangeReceiver("exchange_l_table",
{{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}},
{{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}},
{toNullableVec<String>("s", {"banana", "banana"}),
toNullableVec<String>("join_c", {"apple", "banana"})});
}
Expand Down Expand Up @@ -355,16 +354,11 @@ try
executeAndAssertColumnsEqual(cast_request(), {createNullableColumn<Decimal256>(std::make_tuple(65, 0), {"0.12"}, {0}), createNullableColumn<Decimal256>(std::make_tuple(65, 0), {"0.12"}, {0})});

/// datetime(1970-01-01 00:00:01) == timestamp(1970-01-01 00:00:01)
context.addMockTable("cast", "t1", {{"datetime", TiDB::TP::TypeDatetime}}, {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 6)});
context.addMockTable("cast", "t1", {{"a", TiDB::TP::TypeDatetime}}, {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 6)});

context.addMockTable("cast", "t2", {{"datetime", TiDB::TP::TypeTimestamp}}, {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 6)});
context.addMockTable("cast", "t2", {{"a", TiDB::TP::TypeTimestamp}}, {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 6)});

auto cast_request_1 = [&]() {
return context.scan("cast", "t1")
.join(context.scan("cast", "t2"), tipb::JoinType::TypeInnerJoin, {col("datetime")})
.build(context);
};
executeAndAssertColumnsEqual(cast_request_1(), {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 0), createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 0)});
executeAndAssertColumnsEqual(cast_request(), {createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 0), createDateTimeColumn({{{1970, 1, 1, 0, 0, 1, 0}}}, 0)});
}
CATCH

Expand Down Expand Up @@ -706,7 +700,7 @@ CATCH
TEST_F(JoinExecutorTestRunner, SplitJoinResult)
try
{
context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}), toVec<Int32>("b", {1, 1, 3, 3, 1, 1, 3, 3, 1, 3})});
context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1})});
context.addMockTable("split_test", "t2", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {1, 1, 1, 1, 1})});

auto request = context
Expand All @@ -726,191 +720,6 @@ try
ASSERT_EQ(expect[i][j], blocks[j].rows());
}
}

// with other condition
const auto cond = gt(col("b"), lit(Field(static_cast<Int64>(2))));
request = context
.scan("split_test", "t1")
.join(context.scan("split_test", "t2"), tipb::JoinType::TypeInnerJoin, {col("a")}, {}, {}, {cond}, {})

.build(context);
expect = {{5, 5, 5, 5, 5}, {5, 5, 5, 5, 5}, {5, 5, 5, 5, 5}, {25}, {25}, {25}, {25}, {25}};
for (size_t i = 0; i < block_sizes.size(); ++i)
{
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(block_sizes[i])));
auto blocks = getExecuteStreamsReturnBlocks(request);
ASSERT_EQ(expect[i].size(), blocks.size());
for (size_t j = 0; j < blocks.size(); ++j)
{
ASSERT_EQ(expect[i][j], blocks[j].rows());
}
}
// test non joined data
context.addMockTable("split_test", "t3", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {2, 2, 2, 2, 2})});
context.addMockTable("split_test", "t4", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3})});
request = context
.scan("split_test", "t3")
.join(context.scan("split_test", "t4"), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.build(context);

expect = {{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
{2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
{7, 7, 6},
{20},
{20},
{20},
{20},
{20}};
for (size_t i = 0; i < block_sizes.size(); ++i)
{
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(block_sizes[i])));
auto blocks = getExecuteStreamsReturnBlocks(request);
ASSERT_EQ(expect[i].size(), blocks.size());
for (size_t j = 0; j < blocks.size(); ++j)
{
ASSERT_EQ(expect[i][j], blocks[j].rows());
}
}
}
CATCH

TEST_F(JoinExecutorTestRunner, NonJoinedData)
try
{
DB::MockColumnInfoVec left_column_infos{{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}};
DB::MockColumnInfoVec right_column_infos{{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}};
DB::MockColumnInfoVec right_partition_column_infos{{"a", TiDB::TP::TypeLong}};
ColumnsWithTypeAndName left_column_data;
ColumnsWithTypeAndName right_column_data;
ColumnsWithTypeAndName common_column_data;
size_t table_rows = 102400;
size_t common_rows = 20480;
UInt64 max_block_size = 800;
size_t original_max_streams = 20;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
{
ColumnGeneratorOpts opts{common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
common_column_data.push_back(ColumnGenerator::instance().generate(opts));
}

for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
{
ColumnGeneratorOpts opts{table_rows - common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
left_column_data.push_back(ColumnGenerator::instance().generate(opts));
}

for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(right_column_infos))
{
ColumnGeneratorOpts opts{table_rows - common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
right_column_data.push_back(ColumnGenerator::instance().generate(opts));
}

for (size_t i = 0; i < common_column_data.size(); ++i)
{
left_column_data[i].column->assumeMutable()->insertRangeFrom(*common_column_data[i].column, 0, common_rows);
right_column_data[i].column->assumeMutable()->insertRangeFrom(*common_column_data[i].column, 0, common_rows);
}

ColumnWithTypeAndName shuffle_column = ColumnGenerator::instance().generate({table_rows, "UInt64", RANDOM});
IColumn::Permutation perm;
shuffle_column.column->getPermutation(false, 0, -1, perm);
for (auto & column : left_column_data)
{
column.column = column.column->permute(perm, 0);
}
for (auto & column : right_column_data)
{
column.column = column.column->permute(perm, 0);
}

context.addMockTable("outer_join_test", "left_table_1_concurrency", left_column_infos, left_column_data, 1);
context.addMockTable("outer_join_test", "left_table_3_concurrency", left_column_infos, left_column_data, 3);
context.addMockTable("outer_join_test", "left_table_5_concurrency", left_column_infos, left_column_data, 5);
context.addMockTable("outer_join_test", "left_table_10_concurrency", left_column_infos, left_column_data, 10);
context.addMockTable("outer_join_test", "right_table_1_concurrency", right_column_infos, right_column_data, 1);
context.addMockTable("outer_join_test", "right_table_3_concurrency", right_column_infos, right_column_data, 3);
context.addMockTable("outer_join_test", "right_table_5_concurrency", right_column_infos, right_column_data, 5);
context.addMockTable("outer_join_test", "right_table_10_concurrency", right_column_infos, right_column_data, 10);
context.addExchangeReceiver("right_exchange_receiver_1_concurrency", right_column_infos, right_column_data, 1, right_partition_column_infos);
context.addExchangeReceiver("right_exchange_receiver_3_concurrency", right_column_infos, right_column_data, 3, right_partition_column_infos);
context.addExchangeReceiver("right_exchange_receiver_5_concurrency", right_column_infos, right_column_data, 5, right_partition_column_infos);
context.addExchangeReceiver("right_exchange_receiver_10_concurrency", right_column_infos, right_column_data, 10, right_partition_column_infos);
std::vector<String> left_table_names = {"left_table_1_concurrency", "left_table_3_concurrency", "left_table_5_concurrency", "left_table_10_concurrency"};
std::vector<String> right_table_names = {"right_table_1_concurrency", "right_table_3_concurrency", "right_table_5_concurrency", "right_table_10_concurrency"};
std::vector<size_t> right_exchange_receiver_concurrency = {1, 3, 5, 10};

/// case 1, right join without right condition
auto request = context
.scan("outer_join_test", right_table_names[0])
.join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")})
.project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])})
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use right_table left join left_table as the reference
auto ref_columns = executeStreams(request, original_max_streams);

/// case 1.1 table scan join table scan
for (auto & left_table_name : left_table_names)
{
for (auto & right_table_name : right_table_names)
{
request = context
.scan("outer_join_test", left_table_name)
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
/// case 1.2 table scan join fine grained exchange receiver
for (auto & left_table_name : left_table_names)
{
for (size_t exchange_concurrency : right_exchange_receiver_concurrency)
{
request = context
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
/// case 2, right join with right condition
request = context
.scan("outer_join_test", right_table_names[0])
.join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")}, {gt(col(right_table_names[0] + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, {}, 0)
.project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])})
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use right_table left join left_table as the reference
ref_columns = executeStreams(request, original_max_streams);
/// case 2.1 table scan join table scan
for (auto & left_table_name : left_table_names)
{
for (auto & right_table_name : right_table_names)
{
request = context
.scan("outer_join_test", left_table_name)
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, 0)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
/// case 2.2 table scan join fine grained exchange receiver
for (auto & left_table_name : left_table_names)
{
for (size_t exchange_concurrency : right_exchange_receiver_concurrency)
{
String exchange_name = fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency);
request = context
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
}
CATCH

Expand Down

0 comments on commit 415068c

Please sign in to comment.