Skip to content

Commit

Permalink
Test: Refine fine-grained shuffle ut (#6557)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Dec 28, 2022
1 parent 156e216 commit 5bd0e89
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 83 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ void AggregationBinder::toMPPSubPlan(size_t & executor_index, const DAGPropertie
}

std::shared_ptr<ExchangeSenderBinder> exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys, fine_grained_shuffle_stream_count);
exchange_sender->children.push_back(partial_agg);

std::shared_ptr<ExchangeReceiverBinder> exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg);
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg, fine_grained_shuffle_stream_count);
exchange_map[exchange_receiver->name] = std::make_pair(exchange_receiver, exchange_sender);

/// re-construct agg_exprs and gby_exprs in final_agg
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
tipb_executor->set_executor_id(name);
tipb::ExchangeSender * exchange_sender = tipb_executor->mutable_exchange_sender();
exchange_sender->set_tp(type);
if (tipb_executor->exchange_sender().tp() == tipb::Hash)
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
for (auto i : partition_keys)
{
auto * expr = exchange_sender->add_partition_keys();
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ namespace DB::mock
class ExchangeSenderBinder : public ExecutorBinder
{
public:
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {})
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0)
: ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output)
, type(type_)
, partition_keys(partition_keys_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
Expand All @@ -38,6 +39,7 @@ class ExchangeSenderBinder : public ExecutorBinder
tipb::ExchangeType type;
TaskMetas task_metas;
std::vector<size_t> partition_keys;
uint64_t fine_grained_shuffle_stream_count;
};

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ void JoinBinder::toMPPSubPlan(size_t & executor_index, const DAGProperties & pro
}

std::shared_ptr<ExchangeSenderBinder> left_exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys, fine_grained_shuffle_stream_count);
left_exchange_sender->children.push_back(children[0]);
std::shared_ptr<ExchangeSenderBinder> right_exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys, fine_grained_shuffle_stream_count);
right_exchange_sender->children.push_back(children[1]);

std::shared_ptr<ExchangeReceiverBinder> left_exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[0]->output_schema);
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[0]->output_schema, fine_grained_shuffle_stream_count);
std::shared_ptr<ExchangeReceiverBinder> right_exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[1]->output_schema);
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[1]->output_schema, fine_grained_shuffle_stream_count);
children[0] = left_exchange_receiver;
children[1] = right_exchange_receiver;

Expand Down
19 changes: 9 additions & 10 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace tests
types_col_name[a], types_col_name[b] \
}

class ExecutorAggTestRunner : public ExecutorTest
class AggExecutorTestRunner : public ExecutorTest
{
public:
using ColStringNullableType = std::optional<typename TypeTraits<String>::FieldType>;
Expand Down Expand Up @@ -61,7 +61,7 @@ class ExecutorAggTestRunner : public ExecutorTest
using ColumnWithFloat64 = std::vector<ColFloat64Type>;
using ColumnWithString = std::vector<ColStringType>;

~ExecutorAggTestRunner() override = default;
~AggExecutorTestRunner() override = default;

void initializeContext() override
{
Expand Down Expand Up @@ -225,7 +225,7 @@ class ExecutorAggTestRunner : public ExecutorTest
};

/// Guarantee the correctness of group by
TEST_F(ExecutorAggTestRunner, GroupBy)
TEST_F(AggExecutorTestRunner, GroupBy)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -296,7 +296,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggregationMaxAndMin)
TEST_F(AggExecutorTestRunner, AggregationMaxAndMin)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -344,7 +344,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggregationCount)
TEST_F(AggExecutorTestRunner, AggregationCount)
try
{
/// Prepare some data
Expand Down Expand Up @@ -388,8 +388,7 @@ CATCH

// TODO support more type of min, max, count.
// support more aggregation functions: sum, forst_row, group_concat

TEST_F(ExecutorAggTestRunner, AggregationCountGroupByFastPathMultiKeys)
TEST_F(AggExecutorTestRunner, AggregationCountGroupByFastPathMultiKeys)
try
{
/// Prepare some data
Expand Down Expand Up @@ -492,7 +491,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggNull)
TEST_F(AggExecutorTestRunner, AggNull)
try
{
auto request = context
Expand All @@ -509,7 +508,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, RepeatedAggregateFunction)
TEST_F(AggExecutorTestRunner, RepeatedAggregateFunction)
try
{
std::vector<ASTPtr> functions = {Max(col("s1")), Min(col("s1")), Sum(col("s2"))};
Expand Down Expand Up @@ -557,7 +556,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggMerge)
TEST_F(AggExecutorTestRunner, AggMerge)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
Expand Down
52 changes: 0 additions & 52 deletions dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,49 +502,6 @@ try
}
CATCH

/// For FineGrainedShuffleJoin/Agg test usage, update internal exchange senders/receivers flag
/// Allow select,agg,join,tableScan,exchangeSender,exchangeReceiver,projection executors only
void setFineGrainedShuffleForExchange(tipb::Executor & root)
{
tipb::Executor * current = &root;
while (current)
{
switch (current->tp())
{
case tipb::ExecType::TypeSelection:
current = const_cast<tipb::Executor *>(&current->selection().child());
break;
case tipb::ExecType::TypeAggregation:
current = const_cast<tipb::Executor *>(&current->aggregation().child());
break;
case tipb::ExecType::TypeProjection:
current = const_cast<tipb::Executor *>(&current->projection().child());
break;
case tipb::ExecType::TypeJoin:
{
/// update build side path
JoinInterpreterHelper::TiFlashJoin tiflash_join{current->join()};
current = const_cast<tipb::Executor *>(&current->join().children()[tiflash_join.build_side_index]);
break;
}
case tipb::ExecType::TypeExchangeSender:
if (current->exchange_sender().tp() == tipb::Hash)
current->set_fine_grained_shuffle_stream_count(8);
current = const_cast<tipb::Executor *>(&current->exchange_sender().child());
break;
case tipb::ExecType::TypeExchangeReceiver:
current->set_fine_grained_shuffle_stream_count(8);
current = nullptr;
break;
case tipb::ExecType::TypeTableScan:
current = nullptr;
break;
default:
throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal);
}
}
}

TEST_F(ComputeServerRunner, runFineGrainedShuffleJoinTest)
try
{
Expand Down Expand Up @@ -578,10 +535,6 @@ try
.join(context.scan("test_db", "r_table_2"), join_type, {col("s1"), col("s2")}, enable)
.project({col("l_table_2.s1"), col("l_table_2.s2"), col("l_table_2.s3")});
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}
const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
Expand All @@ -606,11 +559,6 @@ try
.scan("test_db", "test_table_2")
.aggregation({Max(col("s3"))}, {col("s1"), col("s2")}, enable);
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}

const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/tests/gtest_limit_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DB
namespace tests
{

class ExecutorLimitTestRunner : public DB::tests::ExecutorTest
class LimitExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColDataType = std::optional<typename TypeTraits<String>::FieldType>;
Expand All @@ -47,7 +47,7 @@ class ExecutorLimitTestRunner : public DB::tests::ExecutorTest
const ColumnWithData col0{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"};
};

TEST_F(ExecutorLimitTestRunner, Limit)
TEST_F(LimitExecutorTestRunner, Limit)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -77,7 +77,7 @@ try
}
CATCH

TEST_F(ExecutorLimitTestRunner, RawQuery)
TEST_F(LimitExecutorTestRunner, RawQuery)
try
{
String query = "select * from test_db.projection_test_table limit 1";
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/tests/gtest_projection_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DB
namespace tests
{

class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
class ProjectionExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColDataString = std::vector<std::optional<typename TypeTraits<String>::FieldType>>;
Expand Down Expand Up @@ -94,7 +94,7 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
const String table_name{"projection_test_table"};
};

TEST_F(ExecutorProjectionTestRunner, Projection)
TEST_F(ProjectionExecutorTestRunner, Projection)
try
{
/// Check single column
Expand Down Expand Up @@ -141,7 +141,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, ProjectionFunction)
TEST_F(ProjectionExecutorTestRunner, ProjectionFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -231,7 +231,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, MultiFunction)
TEST_F(ProjectionExecutorTestRunner, MultiFunction)
try
{
MockAstVec functions = {
Expand Down Expand Up @@ -308,7 +308,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, MultiProjection)
TEST_F(ProjectionExecutorTestRunner, MultiProjection)
try
{
auto req = context
Expand Down Expand Up @@ -375,7 +375,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, ProjectionThenAgg)
TEST_F(ProjectionExecutorTestRunner, ProjectionThenAgg)
try
{
auto req = context
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Flash/tests/gtest_topn_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ namespace DB
{
namespace tests
{
class ExecutorTopNTestRunner : public DB::tests::ExecutorTest

class TopNExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColStringType = std::optional<typename TypeTraits<String>::FieldType>;
Expand Down Expand Up @@ -85,7 +86,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {})
{
if (func_proj_ast.size() == 0)
if (func_proj_ast.empty())
return context.scan(db_name, table_name).topN(order_by_items, limit).build(context);
else
return context.scan(db_name, table_name).project(func_proj_ast).topN(order_by_items, limit).project(out_proj_ast).build(context);
Expand All @@ -106,7 +107,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest
ColumnWithInt32 col_salary{1300, 0, {}, 900, {}, -300};
};

TEST_F(ExecutorTopNTestRunner, TopN)
TEST_F(TopNExecutorTestRunner, TopN)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -173,7 +174,7 @@ try
}
CATCH

TEST_F(ExecutorTopNTestRunner, TopNFunction)
TEST_F(TopNExecutorTestRunner, TopNFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -245,7 +246,7 @@ try
}
CATCH

TEST_F(ExecutorTopNTestRunner, BigTable)
TEST_F(TopNExecutorTestRunner, BigTable)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
Expand Down

0 comments on commit 5bd0e89

Please sign in to comment.