Skip to content

Commit

Permalink
spill support fine grained shuffle (pingcap#8)
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker authored Mar 7, 2023
1 parent 8c4f02a commit cd4f772
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 126 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock()
size_t partition_index = 0;
Block block;

if (!join->isEnableSpill())
if (!join->isSpilled())
{
block = current_probe_stream->read();
}
Expand All @@ -148,7 +148,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock()

if (!block)
{
if (join->isEnableSpill())
if (join->isSpilled())
{
block = current_probe_stream->read();
if (block)
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Flash/Statistics/JoinImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ namespace DB
void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
{
fmt_buffer.fmtAppend(
R"("hash_table_bytes":{},"build_side_child":"{}",)"
R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{})"
R"("non_joined_outbound_rows":{},"non_joined_outbound_blocks":{},"non_joined_outbound_bytes":{},"non_joined_execution_time_ns":{},)"
R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},"join_build_execution_time_ns":{})",
hash_table_bytes,
peak_build_bytes_usage,
build_side_child,
is_spill_enabled,
is_spilled,
non_joined_base.rows,
non_joined_base.blocks,
non_joined_base.bytes,
Expand All @@ -42,8 +44,10 @@ void JoinStatistics::collectExtraRuntimeDetail()
if (it != join_execute_info_map.end())
{
const auto & join_execute_info = it->second;
hash_table_bytes = join_execute_info.join_ptr->getTotalByteCount();
peak_build_bytes_usage = join_execute_info.join_ptr->getPeakBuildBytesUsage();
build_side_child = join_execute_info.build_side_root_executor_id;
is_spill_enabled = join_execute_info.join_ptr->isEnableSpill();
is_spilled = join_execute_info.join_ptr->isSpilled();
for (const auto & non_joined_stream : join_execute_info.non_joined_streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(non_joined_stream.get()); p_stream)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/JoinImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ class JoinStatistics : public JoinStatisticsBase
JoinStatistics(const tipb::Executor * executor, DAGContext & dag_context_);

private:
size_t hash_table_bytes = 0;
size_t peak_build_bytes_usage = 0;
String build_side_child;
bool is_spill_enabled = false;
bool is_spilled = false;

BaseRuntimeStatistics non_joined_base;

Expand Down
41 changes: 22 additions & 19 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ try
size_t table_rows = 51200;
size_t common_rows = 10240;
UInt64 max_block_size = 800;
size_t original_max_streams = 20;
size_t max_streams_big = 20;
size_t max_streams_small = 4;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
{
ColumnGeneratorOpts opts{common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
Expand Down Expand Up @@ -832,7 +833,7 @@ try
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use right_table left join left_table as the reference
auto ref_columns = executeStreams(request, original_max_streams);
auto ref_columns = executeStreams(request, max_streams_big);

/// case 1.1 table scan join table scan
for (auto & left_table_name : left_table_names)
Expand All @@ -844,17 +845,18 @@ try
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.build(context);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));
// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (right_table_name == "right_table_1_concurrency")
{
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
ASSERT_THROW(executeStreams(request, max_streams_big), Exception);
}
else
{
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));
}
}
}
Expand All @@ -868,18 +870,19 @@ try
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency)
.build(context);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));

// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (exchange_concurrency == 1)
{
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
ASSERT_THROW(executeStreams(request, max_streams_big), Exception);
}
else
{
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));
}
}
}
Expand All @@ -892,7 +895,7 @@ try
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use right_table left join left_table as the reference
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
ref_columns = executeStreams(request, original_max_streams);
ref_columns = executeStreams(request, max_streams_big);
/// case 2.1 table scan join table scan
for (auto & left_table_name : left_table_names)
{
Expand All @@ -903,18 +906,18 @@ try
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, 0)
.build(context);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));

// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (right_table_name == "right_table_1_concurrency")
{
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
ASSERT_THROW(executeStreams(request, max_streams_big), Exception);
}
else
{
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));
}
}
}
Expand All @@ -929,18 +932,18 @@ try
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, exchange_concurrency)
.build(context);
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(0)));
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));

// test spill to disk
context.context.setSetting("max_bytes_before_external_join", Field(static_cast<UInt64>(max_bytes_before_external_join)));
if (exchange_concurrency == 1)
{
ASSERT_THROW(executeStreams(request, original_max_streams), Exception);
ASSERT_THROW(executeStreams(request, max_streams_big), Exception);
}
else
{
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small));
}
}
}
Expand Down
Loading

0 comments on commit cd4f772

Please sign in to comment.