Skip to content

Commit

Permalink
Refine cancel for read thread stream (#8511)
Browse files Browse the repository at this point in the history
ref #8505
  • Loading branch information
SeaRise authored Dec 13, 2023
1 parent 5417d84 commit adf8569
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ class UnorderedInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}

~UnorderedInputStream() override
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}
void cancel(bool /*kill*/) override { decreaseRefCount(); }

~UnorderedInputStream() override { decreaseRefCount(); }

String getName() const override { return NAME; }

Expand All @@ -67,6 +65,16 @@ class UnorderedInputStream : public IProfilingBlockInputStream
}

protected:
void decreaseRefCount()
{
bool ori = false;
if (is_stopped.compare_exchange_strong(ori, true))
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}
}

Block readImpl() override
{
FilterPtr filter_ignored;
Expand Down Expand Up @@ -146,5 +154,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
// runtime filter
std::vector<RuntimeFilterPtr> runtime_filter_list;
int max_wait_time_ms;

std::atomic_bool is_stopped = false;
};
} // namespace DB::DM

0 comments on commit adf8569

Please sign in to comment.