Skip to content

Commit

Permalink
do not mark spill if there is no data to spill in auto spill mode (#8906
Browse files Browse the repository at this point in the history
) (#8912)

close #8905
  • Loading branch information
ti-chi-bot authored Apr 9, 2024
1 parent d16a222 commit 9f01a89
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 6 deletions.
7 changes: 6 additions & 1 deletion dbms/src/Interpreters/AggSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr
if (!in_spillable_stage || !isSpillEnabled())
return false;
per_thread_revocable_memories[thread_num] = new_value;
if (new_value == 0)
// new_value == 0 means no agg data to spill
return false;
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
Expand Down Expand Up @@ -97,6 +100,8 @@ Int64 AggSpillContext::triggerSpillImpl(Int64 expected_released_memories)
for (; checked_thread < per_thread_revocable_memories.size(); ++checked_thread)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_revocable_memories[checked_thread] < MIN_SPILL_THRESHOLD)
continue;
if (per_thread_auto_spill_status[checked_thread].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
Expand Down Expand Up @@ -146,7 +151,7 @@ void AggSpillContext::finishOneSpill(size_t thread_num)

bool AggSpillContext::markThreadForAutoSpill(size_t thread_num)
{
if (in_spillable_stage && isSpillEnabled())
if (in_spillable_stage && isSpillEnabled() && per_thread_revocable_memories[thread_num] > 0)
{
auto old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
return per_thread_auto_spill_status[thread_num].compare_exchange_strong(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ bool Aggregator::executeOnBlock(AggProcessInfo & agg_process_info, AggregatedDat
LOG_TRACE(log, "Revocable bytes after insert one block {}, thread {}", revocable_bytes, thread_num);
if (agg_spill_context->updatePerThreadRevocableMemory(revocable_bytes, thread_num))
{
assert(!result.empty());
result.tryMarkNeedSpill();
}

Expand Down Expand Up @@ -1331,7 +1332,7 @@ inline void Aggregator::insertAggregatesIntoColumns(
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
{
/// If ownership was not transferred to ColumnAggregateFunction.
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
if (destroy_i >= insert_i || !aggregate_functions[destroy_i]->isState())
aggregate_functions[destroy_i]->destroy(mapped + offsets_of_aggregate_states[destroy_i]);
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/HashJoinSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ bool HashJoinSpillContext::updatePartitionRevocableMemory(size_t partition_id, I
return false;
bool is_spilled = (*partition_is_spilled)[partition_id];
(*partition_revocable_memories)[partition_id] = new_value;
if (new_value == 0)
return false;
if (operator_spill_threshold > 0)
{
auto force_spill = is_spilled && operator_spill_threshold > 0
Expand Down Expand Up @@ -244,7 +246,7 @@ Int64 HashJoinSpillContext::triggerSpillImpl(Int64 expected_released_memories)
});
for (const auto & pair : partition_index_to_revocable_memories)
{
if (pair.second.second <= 0)
if (pair.second.second < MIN_SPILL_THRESHOLD)
continue;
if (!in_build_stage && !isPartitionSpilled(pair.first))
/// no new partition spill is allowed if not in build stage
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Interpreters/SortSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ bool SortSpillContext::updateRevocableMemory(Int64 new_value)
if (!in_spillable_stage || !isSpillEnabled())
return false;
revocable_memory = new_value;
if (new_value == 0)
return false;
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
Expand Down Expand Up @@ -69,9 +71,12 @@ bool SortSpillContext::updateRevocableMemory(Int64 new_value)

Int64 SortSpillContext::triggerSpillImpl(DB::Int64 expected_released_memories)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - revocable_memory, 0);
if (revocable_memory >= MIN_SPILL_THRESHOLD)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - revocable_memory, 0);
}
return expected_released_memories;
}

Expand Down
32 changes: 32 additions & 0 deletions dbms/src/Interpreters/tests/gtest_operator_spill_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,38 @@ try
}
CATCH

TEST_F(TestOperatorSpillContext, AutoTriggerSpillOnEmptyData)
try
{
auto agg_spill_context = std::make_shared<AggSpillContext>(2, *spill_config_ptr, 0, logger);
agg_spill_context->setAutoSpillMode();
agg_spill_context->updatePerThreadRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD, 0);
agg_spill_context->updatePerThreadRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD, 1);
ASSERT_TRUE(agg_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD) == 0);
ASSERT_TRUE(agg_spill_context->isThreadMarkedForAutoSpill(0));
ASSERT_FALSE(agg_spill_context->isThreadMarkedForAutoSpill(1));

auto sort_spill_context = std::make_shared<SortSpillContext>(*spill_config_ptr, 0, logger);
sort_spill_context->setAutoSpillMode();
ASSERT_FALSE(sort_spill_context->updateRevocableMemory(0));
ASSERT_TRUE(
sort_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD)
== OperatorSpillContext::MIN_SPILL_THRESHOLD);

auto join_spill_context = std::make_shared<HashJoinSpillContext>(*spill_config_ptr, *spill_config_ptr, 0, logger);
join_spill_context->setAutoSpillMode();
join_spill_context->init(2);
ASSERT_TRUE(
join_spill_context->updatePartitionRevocableMemory(0, OperatorSpillContext::MIN_SPILL_THRESHOLD) == false);
ASSERT_TRUE(join_spill_context->updatePartitionRevocableMemory(1, 0) == false);
ASSERT_TRUE(
join_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 2)
== OperatorSpillContext::MIN_SPILL_THRESHOLD);
ASSERT_TRUE(join_spill_context->isPartitionMarkedForAutoSpill(0));
ASSERT_FALSE(join_spill_context->isPartitionMarkedForAutoSpill(1));
}
CATCH

TEST_F(TestOperatorSpillContext, SortMarkSpill)
try
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/AggregateContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ bool AggregateContext::isTaskMarkedForSpill(size_t task_index)
return true;
if (getAggSpillContext()->updatePerThreadRevocableMemory(many_data[task_index]->revocableBytes(), task_index))
{
assert(!many_data[task_index]->empty());
return many_data[task_index]->tryMarkNeedSpill();
}
return false;
Expand Down

0 comments on commit 9f01a89

Please sign in to comment.