diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index a0a040057e7..813f464bea2 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -56,25 +56,17 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } - void cancel(bool /*kill*/) override { decreaseRefCount(true); } - - ~UnorderedInputStream() override { decreaseRefCount(false); } + ~UnorderedInputStream() override + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no); + } String getName() const override { return NAME; } Block getHeader() const override { return header; } protected: - void decreaseRefCount(bool is_cancel) - { - bool ori = false; - if (is_stopped.compare_exchange_strong(ori, true)) - { - task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->poolId(), ref_no); - } - } - Block readImpl() override { FilterPtr filter_ignored; @@ -137,7 +129,5 @@ class UnorderedInputStream : public IProfilingBlockInputStream LoggerPtr log; int64_t ref_no; bool task_pool_added; - - std::atomic_bool is_stopped = false; }; } // namespace DB::DM