diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 99531b43bdb..eb6249aa08c 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -55,15 +55,18 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } - void cancel(bool /*kill*/) override { decreaseRefCount(true); } - - ~UnorderedInputStream() override { decreaseRefCount(false); } + ~UnorderedInputStream() override + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + } String getName() const override { return NAME; } Block getHeader() const override { return header; } protected: +<<<<<<< HEAD void decreaseRefCount(bool is_cancel) { bool ori = false; @@ -74,6 +77,8 @@ class UnorderedInputStream : public IProfilingBlockInputStream } } +======= +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) Block readImpl() override { FilterPtr filter_ignored; @@ -147,8 +152,15 @@ class UnorderedInputStream : public IProfilingBlockInputStream LoggerPtr log; int64_t ref_no; size_t total_rows = 0; +<<<<<<< HEAD bool task_pool_added; std::atomic_bool is_stopped = false; +======= + + // runtime filter + std::vector runtime_filter_list; + int max_wait_time_ms; +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) }; } // namespace DB::DM