Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize the converting hash table to two level process after finishing hash agg build #8957

Merged
merged 13 commits into from
Apr 25, 2024
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
@@ -111,7 +111,8 @@ namespace DB
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache)
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h>
#include <Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h>

namespace DB
{
void AggregateFinalConvertEvent::scheduleImpl()
{
assert(agg_context);
for (auto index : indexes)
addTask(std::make_unique<AggregateFinalConvertTask>(
exec_context,
log->identifier(),
shared_from_this(),
agg_context,
index));
}

void AggregateFinalConvertEvent::finishImpl()
{
auto dur = getFinishDuration();
for (const auto & profile_info : profile_infos)
profile_info->execution_time += dur;
agg_context.reset();
}
} // namespace DB
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Events/Event.h>
#include <Operators/OperatorProfileInfo.h>

namespace DB
{
class AggregateContext;
using AggregateContextPtr = std::shared_ptr<AggregateContext>;

class AggregateFinalConvertEvent : public Event
{
public:
AggregateFinalConvertEvent(
PipelineExecutorContext & exec_context_,
const String & req_id,
AggregateContextPtr agg_context_,
std::vector<size_t> && indexes_,
OperatorProfileInfos && profile_infos_)
: Event(exec_context_, req_id)
, agg_context(std::move(agg_context_))
, indexes(std::move(indexes_))
, profile_infos(std::move(profile_infos_))
{
assert(agg_context);
assert(!indexes.empty());
assert(!profile_infos.empty());
}

protected:
void scheduleImpl() override;

void finishImpl() override;

private:
AggregateContextPtr agg_context;
std::vector<size_t> indexes;

OperatorProfileInfos profile_infos;
};
} // namespace DB
Original file line number Diff line number Diff line change
@@ -29,8 +29,8 @@ class AggregateFinalSpillEvent : public Event
PipelineExecutorContext & exec_context_,
const String & req_id,
AggregateContextPtr agg_context_,
std::vector<size_t> indexes_,
OperatorProfileInfos profile_infos_)
std::vector<size_t> && indexes_,
OperatorProfileInfos && profile_infos_)
: Event(exec_context_, req_id)
, agg_context(std::move(agg_context_))
, indexes(std::move(indexes_))
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h>
#include <Operators/AggregateContext.h>

namespace DB
{
AggregateFinalConvertTask::AggregateFinalConvertTask(
PipelineExecutorContext & exec_context_,
const String & req_id,
const EventPtr & event_,
AggregateContextPtr agg_context_,
size_t index_)
: EventTask(exec_context_, req_id, event_)
, agg_context(std::move(agg_context_))
, index(index_)
{
assert(agg_context);
}

ExecTaskStatus AggregateFinalConvertTask::executeImpl()
{
agg_context->convertToTwoLevel(index);
agg_context.reset();
return ExecTaskStatus::FINISHED;
}

} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Tasks/IOEventTask.h>

namespace DB
{
class AggregateContext;
using AggregateContextPtr = std::shared_ptr<AggregateContext>;

class AggregateFinalConvertTask : public EventTask
{
public:
AggregateFinalConvertTask(
PipelineExecutorContext & exec_context_,
const String & req_id,
const EventPtr & event_,
AggregateContextPtr agg_context_,
size_t index_);

protected:
ExecTaskStatus executeImpl() override;

private:
AggregateContextPtr agg_context;
size_t index;
};
} // namespace DB
49 changes: 43 additions & 6 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h>
#include <Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h>
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Interpreters/Context.h>
@@ -23,6 +25,11 @@

namespace DB
{
namespace FailPoints
{
extern const char force_agg_two_level_hash_table_before_merge[];
} // namespace FailPoints

void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
@@ -77,6 +84,39 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec_context)
{
assert(aggregate_context);

SCOPE_EXIT({ aggregate_context.reset(); });
if (!aggregate_context->hasSpilledData())
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should at least moved to after aggregate_context->getAggSpillContext()->finishSpillableStage(); How about move the whole code block after check AggregateFinalSpillEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{
if (!aggregate_context->isConvertibleToTwoLevel())
return nullptr;

bool has_two_level = aggregate_context->hasAtLeastOneTwoLevel();
fiu_do_on(FailPoints::force_agg_two_level_hash_table_before_merge, { has_two_level = true; });
if (!has_two_level)
return nullptr;

std::vector<size_t> indexes;
for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index)
{
if (!aggregate_context->isTwoLevelOrEmpty(index))
indexes.push_back(index);
}

if (!indexes.empty())
{
auto final_convert_event = std::make_shared<AggregateFinalConvertEvent>(
exec_context,
log->identifier(),
aggregate_context,
std::move(indexes),
std::move(profile_infos));
return final_convert_event;
}

return nullptr;
}

aggregate_context->getAggSpillContext()->finishSpillableStage();
bool need_final_spill = false;
for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i)
@@ -87,11 +127,9 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec
break;
}
}
if (!aggregate_context->hasSpilledData() && !need_final_spill)
{
aggregate_context.reset();

if (!need_final_spill)
return nullptr;
}

/// Currently, the aggregation spill algorithm requires all bucket data to be spilled,
/// so a new event is added here to execute the final spill.
@@ -114,10 +152,9 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec
aggregate_context,
std::move(indexes),
std::move(profile_infos));
aggregate_context.reset();
return final_spill_event;
}
aggregate_context.reset();

return nullptr;
}
} // namespace DB
27 changes: 21 additions & 6 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ namespace DB
namespace FailPoints
{
extern const char force_agg_on_partial_block[];
extern const char force_agg_two_level_hash_table_before_merge[];
} // namespace FailPoints
namespace tests
{
@@ -355,9 +356,16 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
for (auto force_two_level : {false, true})
{
if (force_two_level)
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}

@@ -414,9 +422,16 @@ try
for (size_t i = 0; i < test_num; ++i)
{
request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
for (auto force_two_level : {false, true})
{
if (force_two_level)
FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
else
FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge);
WRAP_FOR_AGG_PARTIAL_BLOCK_START
executeAndAssertColumnsEqual(request, expect_cols[i]);
WRAP_FOR_AGG_PARTIAL_BLOCK_END
}
}
}

1 change: 1 addition & 0 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
@@ -1194,6 +1194,7 @@ class Aggregator
/// Merge several partially aggregated blocks into one.
BlocksList vstackBlocks(BlocksList & blocks, bool final);

bool isConvertibleToTwoLevel() { return AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen); }
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
11 changes: 11 additions & 0 deletions dbms/src/Operators/AggregateContext.cpp
Original file line number Diff line number Diff line change
@@ -208,4 +208,15 @@ Block AggregateContext::readForConvergent(size_t index)
return {};
return merging_buckets->getData(index);
}

bool AggregateContext::hasAtLeastOneTwoLevel()
{
for (size_t i = 0; i < max_threads; ++i)
{
if (many_data[i]->isTwoLevel())
return true;
}
return false;
}

} // namespace DB
10 changes: 9 additions & 1 deletion dbms/src/Operators/AggregateContext.h
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ struct ThreadData
size_t src_bytes = 0;

Aggregator::AggProcessInfo agg_process_info;
ThreadData(Aggregator * aggregator)
explicit ThreadData(Aggregator * aggregator)
: agg_process_info(aggregator)
{}
};
@@ -84,6 +84,14 @@ class AggregateContext

size_t getTotalBuildRows(size_t task_index) { return threads_data[task_index]->src_rows; }

bool hasAtLeastOneTwoLevel();
bool isConvertibleToTwoLevel() const { return aggregator->isConvertibleToTwoLevel(); }
bool isTwoLevelOrEmpty(size_t task_index) const
{
return many_data[task_index]->isTwoLevel() || many_data[task_index]->empty();
}
void convertToTwoLevel(size_t task_index) { many_data[task_index]->convertToTwoLevel(); }

private:
std::unique_ptr<Aggregator> aggregator;
bool keys_size = false;