diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 9d8ad1484d4..1f24d3030f4 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -111,7 +111,8 @@ namespace DB M(force_set_fap_candidate_store_id) \ M(force_not_clean_fap_on_destroy) \ M(delta_tree_create_node_fail) \ - M(disable_flush_cache) + M(disable_flush_cache) \ + M(force_agg_two_level_hash_table_before_merge) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ M(pause_with_alter_locks_acquired) \ diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp new file mode 100644 index 00000000000..9ad7b9af763 --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +void AggregateFinalConvertEvent::scheduleImpl() +{ + assert(agg_context); + for (auto index : indexes) + addTask(std::make_unique( + exec_context, + log->identifier(), + shared_from_this(), + agg_context, + index)); +} + +void AggregateFinalConvertEvent::finishImpl() +{ + auto dur = getFinishDuration(); + for (const auto & profile_info : profile_infos) + profile_info->execution_time += dur; + agg_context.reset(); +} +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h new file mode 100644 index 00000000000..8f5f204d867 --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h @@ -0,0 +1,55 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class AggregateContext; +using AggregateContextPtr = std::shared_ptr; + +class AggregateFinalConvertEvent : public Event +{ +public: + AggregateFinalConvertEvent( + PipelineExecutorContext & exec_context_, + const String & req_id, + AggregateContextPtr agg_context_, + std::vector && indexes_, + OperatorProfileInfos && profile_infos_) + : Event(exec_context_, req_id) + , agg_context(std::move(agg_context_)) + , indexes(std::move(indexes_)) + , profile_infos(std::move(profile_infos_)) + { + assert(agg_context); + assert(!indexes.empty()); + assert(!profile_infos.empty()); + } + +protected: + void scheduleImpl() override; + + void finishImpl() override; + +private: + AggregateContextPtr agg_context; + std::vector indexes; + + OperatorProfileInfos profile_infos; +}; +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h index e8cc190f93e..d8d8e1a78de 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalSpillEvent.h @@ -29,8 +29,8 @@ class AggregateFinalSpillEvent : public Event PipelineExecutorContext & exec_context_, const String & req_id, AggregateContextPtr agg_context_, - std::vector indexes_, - OperatorProfileInfos profile_infos_) + std::vector && indexes_, + OperatorProfileInfos && profile_infos_) : Event(exec_context_, req_id) , agg_context(std::move(agg_context_)) , indexes(std::move(indexes_)) diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.cpp new file mode 100644 index 00000000000..e8a3f7bc89f --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.cpp @@ -0,0 +1,40 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +AggregateFinalConvertTask::AggregateFinalConvertTask( + PipelineExecutorContext & exec_context_, + const String & req_id, + const EventPtr & event_, + AggregateContextPtr agg_context_, + size_t index_) + : EventTask(exec_context_, req_id, event_) + , agg_context(std::move(agg_context_)) + , index(index_) +{ + assert(agg_context); +} + +ExecTaskStatus AggregateFinalConvertTask::executeImpl() +{ + agg_context->convertToTwoLevel(index); + agg_context.reset(); + return ExecTaskStatus::FINISHED; +} + +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h new file mode 100644 index 00000000000..d83f7f33cfb --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/AggregateFinalConvertTask.h @@ -0,0 +1,41 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class AggregateContext; +using AggregateContextPtr = std::shared_ptr; + +class AggregateFinalConvertTask : public EventTask +{ +public: + AggregateFinalConvertTask( + PipelineExecutorContext & exec_context_, + const String & req_id, + const EventPtr & event_, + AggregateContextPtr agg_context_, + size_t index_); + +protected: + ExecTaskStatus executeImpl() override; + +private: + AggregateContextPtr agg_context; + size_t index; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp index 2c2d521250e..32bd3d31033 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include +#include #include #include #include @@ -23,6 +25,11 @@ namespace DB { +namespace FailPoints +{ +extern const char force_agg_two_level_hash_table_before_merge[]; +} // namespace FailPoints + void PhysicalAggregationBuild::buildPipelineExecGroupImpl( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, @@ -77,6 +84,9 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl( EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec_context) { assert(aggregate_context); + + SCOPE_EXIT({ aggregate_context.reset(); }); + aggregate_context->getAggSpillContext()->finishSpillableStage(); bool need_final_spill = false; for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i) @@ -87,37 +97,59 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec break; } } - if (!aggregate_context->hasSpilledData() && !need_final_spill) - { - aggregate_context.reset(); - return nullptr; - } - /// Currently, the aggregation spill algorithm requires all bucket data to be spilled, - /// so a new event is added here to execute the final spill. - /// ...──►AggregateBuildSinkOp[local spill]──┐ - /// ...──►AggregateBuildSinkOp[local spill]──┤ ┌──►AggregateFinalSpillTask - /// ...──►AggregateBuildSinkOp[local spill]──┼──►[final spill]AggregateFinalSpillEvent─┼──►AggregateFinalSpillTask - /// ...──►AggregateBuildSinkOp[local spill]──┤ └──►AggregateFinalSpillTask - /// ...──►AggregateBuildSinkOp[local spill]──┘ - std::vector indexes; - for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index) + if (need_final_spill) { - if (aggregate_context->needSpill(index, /*try_mark_need_spill=*/true)) - indexes.push_back(index); + /// Currently, the aggregation spill algorithm requires all bucket data to be spilled, + /// so a new event is added here to execute the final spill. + /// ...──►AggregateBuildSinkOp[local spill]──┐ + /// ...──►AggregateBuildSinkOp[local spill]──┤ ┌──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┼──►[final spill]AggregateFinalSpillEvent─┼──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┤ └──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┘ + std::vector indexes; + for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index) + { + if (aggregate_context->needSpill(index, /*try_mark_need_spill=*/true)) + indexes.push_back(index); + } + if (!indexes.empty()) + { + auto final_spill_event = std::make_shared( + exec_context, + log->identifier(), + aggregate_context, + std::move(indexes), + std::move(profile_infos)); + return final_spill_event; + } } - if (!indexes.empty()) + + if (!aggregate_context->hasSpilledData() && aggregate_context->isConvertibleToTwoLevel()) { - auto final_spill_event = std::make_shared( - exec_context, - log->identifier(), - aggregate_context, - std::move(indexes), - std::move(profile_infos)); - aggregate_context.reset(); - return final_spill_event; + bool has_two_level = aggregate_context->hasAtLeastOneTwoLevel(); + fiu_do_on(FailPoints::force_agg_two_level_hash_table_before_merge, { has_two_level = true; }); + if (has_two_level) + { + std::vector indexes; + for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index) + { + if (!aggregate_context->isTwoLevelOrEmpty(index)) + indexes.push_back(index); + } + if (!indexes.empty()) + { + auto final_convert_event = std::make_shared( + exec_context, + log->identifier(), + aggregate_context, + std::move(indexes), + std::move(profile_infos)); + return final_convert_event; + } + } } - aggregate_context.reset(); + return nullptr; } } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index 33440878fab..cb1625dd2db 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -23,6 +23,7 @@ namespace DB namespace FailPoints { extern const char force_agg_on_partial_block[]; +extern const char force_agg_two_level_hash_table_before_merge[]; } // namespace FailPoints namespace tests { @@ -355,9 +356,16 @@ try for (size_t i = 0; i < test_num; ++i) { request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_PARTIAL_BLOCK_START - executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_PARTIAL_BLOCK_END + for (auto force_two_level : {false, true}) + { + if (force_two_level) + FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); + else + FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); + WRAP_FOR_AGG_PARTIAL_BLOCK_START + executeAndAssertColumnsEqual(request, expect_cols[i]); + WRAP_FOR_AGG_PARTIAL_BLOCK_END + } } } @@ -414,9 +422,16 @@ try for (size_t i = 0; i < test_num; ++i) { request = buildDAGRequest(std::make_pair(db_name, table_types), {}, group_by_exprs[i], projections[i]); - WRAP_FOR_AGG_PARTIAL_BLOCK_START - executeAndAssertColumnsEqual(request, expect_cols[i]); - WRAP_FOR_AGG_PARTIAL_BLOCK_END + for (auto force_two_level : {false, true}) + { + if (force_two_level) + FailPointHelper::enableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); + else + FailPointHelper::disableFailPoint(FailPoints::force_agg_two_level_hash_table_before_merge); + WRAP_FOR_AGG_PARTIAL_BLOCK_START + executeAndAssertColumnsEqual(request, expect_cols[i]); + WRAP_FOR_AGG_PARTIAL_BLOCK_END + } } } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 33a1ea1bd4c..a10b77f5a8b 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1194,6 +1194,7 @@ class Aggregator /// Merge several partially aggregated blocks into one. BlocksList vstackBlocks(BlocksList & blocks, bool final); + bool isConvertibleToTwoLevel() { return AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen); } /** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used. * This is needed to simplify merging of that data with other results, that are already two-level. */ diff --git a/dbms/src/Operators/AggregateContext.cpp b/dbms/src/Operators/AggregateContext.cpp index 391b1acce57..4c1b77cf0ae 100644 --- a/dbms/src/Operators/AggregateContext.cpp +++ b/dbms/src/Operators/AggregateContext.cpp @@ -208,4 +208,15 @@ Block AggregateContext::readForConvergent(size_t index) return {}; return merging_buckets->getData(index); } + +bool AggregateContext::hasAtLeastOneTwoLevel() +{ + for (size_t i = 0; i < max_threads; ++i) + { + if (many_data[i]->isTwoLevel()) + return true; + } + return false; +} + } // namespace DB diff --git a/dbms/src/Operators/AggregateContext.h b/dbms/src/Operators/AggregateContext.h index 5c1351a1cd1..9e736879414 100644 --- a/dbms/src/Operators/AggregateContext.h +++ b/dbms/src/Operators/AggregateContext.h @@ -28,7 +28,7 @@ struct ThreadData size_t src_bytes = 0; Aggregator::AggProcessInfo agg_process_info; - ThreadData(Aggregator * aggregator) + explicit ThreadData(Aggregator * aggregator) : agg_process_info(aggregator) {} }; @@ -84,6 +84,14 @@ class AggregateContext size_t getTotalBuildRows(size_t task_index) { return threads_data[task_index]->src_rows; } + bool hasAtLeastOneTwoLevel(); + bool isConvertibleToTwoLevel() const { return aggregator->isConvertibleToTwoLevel(); } + bool isTwoLevelOrEmpty(size_t task_index) const + { + return many_data[task_index]->isTwoLevel() || many_data[task_index]->empty(); + } + void convertToTwoLevel(size_t task_index) { many_data[task_index]->convertToTwoLevel(); } + private: std::unique_ptr aggregator; bool keys_size = false;