From 69a20c9fcd5f17f314ffe67199ed446c8ae92aad Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Mon, 18 Dec 2023 15:48:22 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #8541 Signed-off-by: ti-chi-bot --- .../ReadThread/UnorderedInputStream.h | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index a0a040057e7..019126e2cfa 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -56,15 +56,18 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } - void cancel(bool /*kill*/) override { decreaseRefCount(true); } - - ~UnorderedInputStream() override { decreaseRefCount(false); } + ~UnorderedInputStream() override + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + } String getName() const override { return NAME; } Block getHeader() const override { return header; } protected: +<<<<<<< HEAD void decreaseRefCount(bool is_cancel) { bool ori = false; @@ -75,6 +78,8 @@ class UnorderedInputStream : public IProfilingBlockInputStream } } +======= +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) Block readImpl() override { FilterPtr filter_ignored; @@ -138,6 +143,14 @@ class UnorderedInputStream : public IProfilingBlockInputStream int64_t ref_no; bool task_pool_added; +<<<<<<< HEAD std::atomic_bool is_stopped = false; +======= + size_t total_rows = 0; + + // runtime filter + std::vector runtime_filter_list; + int max_wait_time_ms; +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) }; } // namespace DB::DM From 19305c0a4d406b3084ce445c8285d1251ca6245f Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 18 Dec 2023 16:00:55 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: gengliqi --- .../ReadThread/UnorderedInputStream.h | 25 +------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 019126e2cfa..813f464bea2 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -59,7 +59,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream ~UnorderedInputStream() override { task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } String getName() const override { return NAME; } @@ -67,19 +67,6 @@ class UnorderedInputStream : public IProfilingBlockInputStream Block getHeader() const override { return header; } protected: -<<<<<<< HEAD - void decreaseRefCount(bool is_cancel) - { - bool ori = false; - if (is_stopped.compare_exchange_strong(ori, true)) - { - task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->poolId(), ref_no); - } - } - -======= ->>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) Block readImpl() override { FilterPtr filter_ignored; @@ -142,15 +129,5 @@ class UnorderedInputStream : public IProfilingBlockInputStream LoggerPtr log; int64_t ref_no; bool task_pool_added; - -<<<<<<< HEAD - std::atomic_bool is_stopped = false; -======= - size_t total_rows = 0; - - // runtime filter - std::vector runtime_filter_list; - int max_wait_time_ms; ->>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) }; } // namespace DB::DM