diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index a887f187587..c69a9f1cb3d 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,11 +46,9 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } - ~UnorderedInputStream() override - { - task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); - } + void cancel(bool /*kill*/) override { decreaseRefCount(); } + + ~UnorderedInputStream() override { decreaseRefCount(); } String getName() const override { return NAME; } @@ -67,6 +65,16 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: + void decreaseRefCount() + { + bool ori = false; + if (is_stopped.compare_exchange_strong(ori, true)) + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + } + } + Block readImpl() override { FilterPtr filter_ignored; @@ -146,5 +154,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream // runtime filter std::vector runtime_filter_list; int max_wait_time_ms; + + std::atomic_bool is_stopped = false; }; } // namespace DB::DM