Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregation spill may stuck in infinite loop if an empty hash table is marked to spill #8905

Closed
windtalker opened this issue Apr 8, 2024 · 0 comments · Fixed by #8906
Closed
Labels
affects-7.4 affects-7.5 This bug affects the 7.5.x(LTS) versions. affects-8.0 component/compute severity/major type/bug The issue is confirmed as a bug.

Comments

@windtalker
Copy link
Contributor

Bug Report

Please answer these questions before submitting your issue. Thanks!

1. Minimal reproduce step (Required)

Step 1

In

Int64 AggSpillContext::triggerSpillImpl(Int64 expected_released_memories)
{
size_t checked_thread = 0;
for (; checked_thread < per_thread_revocable_memories.size(); ++checked_thread)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[checked_thread].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
{
LOG_DEBUG(
log,
"Mark thread {} to spill, expect to release {} bytes",
checked_thread,
per_thread_revocable_memories[checked_thread]);
}
expected_released_memories
= std::max(expected_released_memories - per_thread_revocable_memories[checked_thread], 0);
if (expected_released_memories == 0)
break;
}
if (spill_config.max_cached_data_bytes_in_spiller > 0)
{
auto spill_threshold = static_cast<Int64>(spill_config.max_cached_data_bytes_in_spiller);
for (size_t i = checked_thread + 1; i < per_thread_revocable_memories.size(); ++i)
{
/// unlike sort and hash join, the implementation of current agg spill does not support partial spill, that is to say,
/// once agg spill is triggered, all the data will be spilled in the end, so here to spill the data if memory usage is large enough
if (per_thread_revocable_memories[i] >= spill_threshold)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[i].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
{
LOG_DEBUG(
log,
"Mark thread {} to spill, expect to release {} bytes",
i,
per_thread_revocable_memories[i]);
}
}
}
}
return expected_released_memories;
}

Even if per_thread_revocable_memories[index] is 0(which mean the hash table is empty), it will be marked as NEED_AUTO_SPILL

Step 2

In

OperatorStatus AggregateBuildSinkOp::prepareImpl()
{
while (agg_context->hasLocalDataToBuild(index))
{
agg_context->buildOnLocalData(index);
if (agg_context->needSpill(index))
return OperatorStatus::IO_OUT;
}
return agg_context->isTaskMarkedForSpill(index) ? OperatorStatus::IO_OUT : OperatorStatus::NEED_INPUT;
}

If there is no local data to build, it will first check isTaskMarkedForSpill

Step 3

In

bool AggregateContext::isTaskMarkedForSpill(size_t task_index)
{
if (needSpill(task_index))
return true;
if (getAggSpillContext()->updatePerThreadRevocableMemory(many_data[task_index]->revocableBytes(), task_index))
{
return many_data[task_index]->tryMarkNeedSpill();
}
return false;
}

isTaskMarkedForSpill will call updatePerThreadRevocableMemory

Step 4

In

bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thread_num)
{
if (!in_spillable_stage || !isSpillEnabled())
return false;
per_thread_revocable_memories[thread_num] = new_value;
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[thread_num].compare_exchange_strong(
old_value,
AutoSpillStatus::WAIT_SPILL_FINISH))
/// in auto spill mode, don't set revocable_memory to 0 here, so in triggerSpill it will take
/// the revocable_memory into account if current spill is on the way
return true;
bool ret = false;
fiu_do_on(FailPoints::random_marked_for_auto_spill, {
old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (new_value > 0
&& per_thread_auto_spill_status[thread_num].compare_exchange_strong(
old_value,
AutoSpillStatus::WAIT_SPILL_FINISH))
ret = true;
});
return ret;

It will use cas to change per_thread_auto_spill_status[thread_num] to WAIT_SPILL_FINISH and return true

Step 5

In

bool AggregatedDataVariants::tryMarkNeedSpill()
{
assert(!need_spill);
if (empty())
return false;

If current hash table is empty, it will return false, which means no need to spill

Step 6

After the above 5 steps, now aggregation is expected to be in spill status from the view of AggregationSpillContext, but it actually is not in spill state from the view of Aggregator, in Aggregator::executeOnBlock, it will throw ResizeException but can not trigger spill because updatePerThreadRevocableMemory never returns true, so there is no change to call tryMarkNeedSpill. ButAggregator::executeOnBlock never aggregate data because once it need to resize hash table, it will meet ResizeException

2. What did you expect to see? (Required)

3. What did you see instead (Required)

4. What is your TiFlash version? (Required)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affects-7.4 affects-7.5 This bug affects the 7.5.x(LTS) versions. affects-8.0 component/compute severity/major type/bug The issue is confirmed as a bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant