From 6408d60e2320b48e6217b56798764c073a667e5d Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Mon, 18 Dec 2023 15:48:22 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #8541 Signed-off-by: ti-chi-bot --- .../DeltaMerge/ReadThread/UnorderedInputStream.h | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index c69a9f1cb3d..10ece39f756 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,9 +46,17 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } +<<<<<<< HEAD void cancel(bool /*kill*/) override { decreaseRefCount(); } ~UnorderedInputStream() override { decreaseRefCount(); } +======= + ~UnorderedInputStream() override + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + } +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) String getName() const override { return NAME; } @@ -65,6 +73,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: +<<<<<<< HEAD void decreaseRefCount() { bool ori = false; @@ -75,6 +84,8 @@ class UnorderedInputStream : public IProfilingBlockInputStream } } +======= +>>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) Block readImpl() override { FilterPtr filter_ignored; @@ -154,7 +165,5 @@ class UnorderedInputStream : public IProfilingBlockInputStream // runtime filter std::vector runtime_filter_list; int max_wait_time_ms; - - std::atomic_bool is_stopped = false; }; } // namespace DB::DM From 256c920346ec71bcbc3d45e681775d7df7904036 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 18 Dec 2023 15:56:52 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: gengliqi --- .../ReadThread/UnorderedInputStream.h | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 10ece39f756..a887f187587 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,17 +46,11 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } -<<<<<<< HEAD - void cancel(bool /*kill*/) override { decreaseRefCount(); } - - ~UnorderedInputStream() override { decreaseRefCount(); } -======= ~UnorderedInputStream() override { task_pool->decreaseUnorderedInputStreamRefCount(); LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } ->>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) String getName() const override { return NAME; } @@ -73,19 +67,6 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: -<<<<<<< HEAD - void decreaseRefCount() - { - bool ori = false; - if (is_stopped.compare_exchange_strong(ori, true)) - { - task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); - } - } - -======= ->>>>>>> 72db40557a (Revert "Refine cancel for read thread stream (#8511)" (#8541)) Block readImpl() override { FilterPtr filter_ignored;