Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize the converting hash table to two level process after finishing hash agg build #8957

Merged
merged 13 commits into from
Apr 25, 2024
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ namespace DB
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache)
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h>
#include <Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h>

namespace DB
{
void AggregateFinalConvertEvent::scheduleImpl()
{
assert(agg_context);
for (auto index : indexes)
addTask(std::make_unique<AggregateFinalConvertTask>(
exec_context,
log->identifier(),
shared_from_this(),
agg_context,
index));
}

void AggregateFinalConvertEvent::finishImpl()
{
auto dur = getFinishDuration();
for (const auto & profile_info : profile_infos)
profile_info->execution_time += dur;
agg_context.reset();
}
} // namespace DB
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Events/Event.h>
#include <Operators/OperatorProfileInfo.h>

namespace DB
{
class AggregateContext;
using AggregateContextPtr = std::shared_ptr<AggregateContext>;

class AggregateFinalConvertEvent : public Event
{
public:
AggregateFinalConvertEvent(
PipelineExecutorContext & exec_context_,
const String & req_id,
AggregateContextPtr agg_context_,
std::vector<size_t> && indexes_,
OperatorProfileInfos && profile_infos_)
: Event(exec_context_, req_id)
, agg_context(std::move(agg_context_))
, indexes(std::move(indexes_))
, profile_infos(std::move(profile_infos_))
{
assert(agg_context);
assert(!indexes.empty());
assert(!profile_infos.empty());
}

protected:
void scheduleImpl() override;

void finishImpl() override;

private:
AggregateContextPtr agg_context;
std::vector<size_t> indexes;

OperatorProfileInfos profile_infos;
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class AggregateFinalSpillEvent : public Event
PipelineExecutorContext & exec_context_,
const String & req_id,
AggregateContextPtr agg_context_,
std::vector<size_t> indexes_,
OperatorProfileInfos profile_infos_)
std::vector<size_t> && indexes_,
OperatorProfileInfos && profile_infos_)
: Event(exec_context_, req_id)
, agg_context(std::move(agg_context_))
, indexes(std::move(indexes_))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h>
#include <Operators/AggregateContext.h>

namespace DB
{
AggregateFinalConvertTask::AggregateFinalConvertTask(
PipelineExecutorContext & exec_context_,
const String & req_id,
const EventPtr & event_,
AggregateContextPtr agg_context_,
size_t index_)
: EventTask(exec_context_, req_id, event_)
, agg_context(std::move(agg_context_))
, index(index_)
{
assert(agg_context);
}

ExecTaskStatus AggregateFinalConvertTask::executeImpl()
{
agg_context->convertToTwoLevel(index);
agg_context.reset();
return ExecTaskStatus::FINISHED;
}

} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Tasks/IOEventTask.h>

namespace DB
{
class AggregateContext;
using AggregateContextPtr = std::shared_ptr<AggregateContext>;

class AggregateFinalConvertTask : public EventTask
{
public:
AggregateFinalConvertTask(
PipelineExecutorContext & exec_context_,
const String & req_id,
const EventPtr & event_,
AggregateContextPtr agg_context_,
size_t index_);

protected:
ExecTaskStatus executeImpl() override;

private:
AggregateContextPtr agg_context;
size_t index;
};
} // namespace DB
84 changes: 58 additions & 26 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h>
#include <Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h>
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Interpreters/Context.h>
Expand All @@ -23,6 +25,11 @@

namespace DB
{
namespace FailPoints
{
extern const char force_agg_two_level_hash_table_before_merge[];
} // namespace FailPoints

void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
Expand Down Expand Up @@ -77,6 +84,9 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec_context)
{
assert(aggregate_context);

SCOPE_EXIT({ aggregate_context.reset(); });

aggregate_context->getAggSpillContext()->finishSpillableStage();
bool need_final_spill = false;
for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i)
Expand All @@ -87,37 +97,59 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec
break;
}
}
if (!aggregate_context->hasSpilledData() && !need_final_spill)
{
aggregate_context.reset();
return nullptr;
}

/// Currently, the aggregation spill algorithm requires all bucket data to be spilled,
/// so a new event is added here to execute the final spill.
/// ...──►AggregateBuildSinkOp[local spill]──┐
/// ...──►AggregateBuildSinkOp[local spill]──┤ ┌──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┼──►[final spill]AggregateFinalSpillEvent─┼──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┤ └──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┘
std::vector<size_t> indexes;
for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index)
if (need_final_spill)
{
if (aggregate_context->needSpill(index, /*try_mark_need_spill=*/true))
indexes.push_back(index);
/// Currently, the aggregation spill algorithm requires all bucket data to be spilled,
/// so a new event is added here to execute the final spill.
/// ...──►AggregateBuildSinkOp[local spill]──┐
/// ...──►AggregateBuildSinkOp[local spill]──┤ ┌──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┼──►[final spill]AggregateFinalSpillEvent─┼──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┤ └──►AggregateFinalSpillTask
/// ...──►AggregateBuildSinkOp[local spill]──┘
std::vector<size_t> indexes;
for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index)
{
if (aggregate_context->needSpill(index, /*try_mark_need_spill=*/true))
indexes.push_back(index);
}
if (!indexes.empty())
{
auto final_spill_event = std::make_shared<AggregateFinalSpillEvent>(
exec_context,
log->identifier(),
aggregate_context,
std::move(indexes),
std::move(profile_infos));
return final_spill_event;
}
}
if (!indexes.empty())

if (!aggregate_context->hasSpilledData() && aggregate_context->isConvertibleToTwoLevel())
{
auto final_spill_event = std::make_shared<AggregateFinalSpillEvent>(
exec_context,
log->identifier(),
aggregate_context,
std::move(indexes),
std::move(profile_infos));
aggregate_context.reset();
return final_spill_event;
bool has_two_level = aggregate_context->hasAtLeastOneTwoLevel();
fiu_do_on(FailPoints::force_agg_two_level_hash_table_before_merge, { has_two_level = true; });
if (has_two_level)
{
std::vector<size_t> indexes;
for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index)
{
if (!aggregate_context->isTwoLevelOrEmpty(index))
indexes.push_back(index);
}
if (!indexes.empty())
{
auto final_convert_event = std::make_shared<AggregateFinalConvertEvent>(
exec_context,
log->identifier(),
aggregate_context,
std::move(indexes),
std::move(profile_infos));
return final_convert_event;
}
}
}
aggregate_context.reset();

return nullptr;
}
} // namespace DB
27 changes: 21 additions & 6 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace DB
namespace FailPoints
{
extern const char force_agg_on_partial_block[];
extern const char force_agg_two_level_hash_table_before_merge[];
} // namespace FailPoints
namespace tests
{
Expand Down Expand Up @@ -355,9 +356,16 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
for (auto force_two_level : {false, true})
{
if (force_two_level)
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}

Expand Down Expand Up @@ -414,9 +422,16 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
for (auto force_two_level : {false, true})
{
if (force_two_level)
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ class Aggregator
/// Merge several partially aggregated blocks into one.
BlocksList vstackBlocks(BlocksList & blocks, bool final);

bool isConvertibleToTwoLevel() { return AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen); }
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Operators/AggregateContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,15 @@ Block AggregateContext::readForConvergent(size_t index)
return {};
return merging_buckets->getData(index);
}

bool AggregateContext::hasAtLeastOneTwoLevel()
{
for (size_t i = 0; i < max_threads; ++i)
{
if (many_data[i]->isTwoLevel())
return true;
}
return false;
}

} // namespace DB
Loading