From b0b630431a0c0f477b8424c1f1fc9b900a83bdd4 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 12 Dec 2023 23:04:26 +0800 Subject: [PATCH 1/6] process streams of partition tables one by one Signed-off-by: gengliqi --- dbms/src/DataStreams/MultiplexInputStream.h | 60 +++------------------ 1 file changed, 7 insertions(+), 53 deletions(-) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index ceddfa22f90..33e42426cbe 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -38,38 +38,25 @@ class MultiPartitionStreamPool if (cur_streams.empty()) return; std::unique_lock lk(mu); - streams_queue_by_partition.push_back(std::make_shared>>()); - for (const auto & stream : cur_streams) - streams_queue_by_partition.back()->push(stream); added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end()); } - std::shared_ptr pickOne() + BlockInputStreamPtr pickOne() { std::unique_lock lk(mu); - if (streams_queue_by_partition.empty()) + if (added_streams.empty()) return nullptr; - if (streams_queue_id >= static_cast(streams_queue_by_partition.size())) - streams_queue_id = 0; - - auto & q = *streams_queue_by_partition[streams_queue_id]; - std::shared_ptr ret = nullptr; - assert(!q.empty()); - ret = q.front(); - q.pop(); - if (q.empty()) - streams_queue_id = removeQueue(streams_queue_id); - else - streams_queue_id = nextQueueId(streams_queue_id); + + auto ret = added_streams.front(); + added_streams.pop_front(); return ret; } - int exportAddedStreams(BlockInputStreams & ret_streams) + void exportAddedStreams(BlockInputStreams & ret_streams) { std::unique_lock lk(mu); for (auto & stream : added_streams) ret_streams.push_back(stream); - return added_streams.size(); } int addedStreamsCnt() @@ -79,40 +66,7 @@ class MultiPartitionStreamPool } private: - int removeQueue(int queue_id) - { - streams_queue_by_partition[queue_id] = nullptr; - if (queue_id != static_cast(streams_queue_by_partition.size()) - 1) - { - swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back()); - streams_queue_by_partition.pop_back(); - return queue_id; - } - else - { - streams_queue_by_partition.pop_back(); - return 0; - } - } - - int nextQueueId(int queue_id) const - { - if (queue_id + 1 < static_cast(streams_queue_by_partition.size())) - return queue_id + 1; - else - return 0; - } - - static void swap( - std::shared_ptr>> & a, - std::shared_ptr>> & b) - { - a.swap(b); - } - - std::vector>>> streams_queue_by_partition; - std::vector> added_streams; - int streams_queue_id = 0; + std::deque added_streams; std::mutex mu; }; From 3f35df52e9b13473e5a40e01794123e7dbe39476 Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Wed, 13 Dec 2023 14:17:05 +0800 Subject: [PATCH 2/6] Update dbms/src/DataStreams/MultiplexInputStream.h Co-authored-by: SeaRise --- dbms/src/DataStreams/MultiplexInputStream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index 33e42426cbe..00b4ad0c95a 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -47,7 +47,7 @@ class MultiPartitionStreamPool if (added_streams.empty()) return nullptr; - auto ret = added_streams.front(); + auto ret = std::move(added_streams.front()); added_streams.pop_front(); return ret; } From 043ceaac76f8fc1214db438ada36f1972c44b483 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 13 Dec 2023 15:31:48 +0800 Subject: [PATCH 3/6] update cancel Signed-off-by: gengliqi --- dbms/src/DataStreams/MultiplexInputStream.h | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index 00b4ad0c95a..caf25b4c174 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -33,6 +33,19 @@ class MultiPartitionStreamPool public: MultiPartitionStreamPool() = default; + void cancel(bool kill) + { + bool old_val = false; + if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + return; + + for (auto & stream : added_streams) + if (auto * p_stream = dynamic_cast(stream.get())) + { + p_stream->cancel(kill); + } + } + void addPartitionStreams(const BlockInputStreams & cur_streams) { if (cur_streams.empty()) @@ -68,6 +81,7 @@ class MultiPartitionStreamPool private: std::deque added_streams; std::mutex mu; + std::atomic_bool is_cancelled{false}; }; class MultiplexInputStream final : public IProfilingBlockInputStream @@ -124,6 +138,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream child->cancel(kill); } } + + shared_pool->cancel(kill); } Block getHeader() const override { return children.at(0)->getHeader(); } From 279556e1675cd1b810f6722ffa331d55c6fe9562 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 13 Dec 2023 18:37:09 +0800 Subject: [PATCH 4/6] address comment Signed-off-by: gengliqi --- dbms/src/DataStreams/MultiplexInputStream.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index caf25b4c174..4cb6ebcc60d 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -35,10 +35,11 @@ class MultiPartitionStreamPool void cancel(bool kill) { - bool old_val = false; - if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + std::unique_lock lk(mu); + if (is_cancelled) return; + is_cancelled = true; for (auto & stream : added_streams) if (auto * p_stream = dynamic_cast(stream.get())) { @@ -80,8 +81,8 @@ class MultiPartitionStreamPool private: std::deque added_streams; + bool is_cancelled; std::mutex mu; - std::atomic_bool is_cancelled{false}; }; class MultiplexInputStream final : public IProfilingBlockInputStream From 5c3bc027de1bfb847bb403094c1e69d3b42a6bfc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 14 Dec 2023 14:39:45 +0800 Subject: [PATCH 5/6] update Signed-off-by: gengliqi --- dbms/src/DataStreams/MultiplexInputStream.h | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index 4cb6ebcc60d..78c00c8e76c 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -35,12 +35,17 @@ class MultiPartitionStreamPool void cancel(bool kill) { - std::unique_lock lk(mu); - if (is_cancelled) - return; + std::deque tmp_streams; + { + std::unique_lock lk(mu); + if (is_cancelled) + return; - is_cancelled = true; - for (auto & stream : added_streams) + is_cancelled = true; + tmp_streams.swap(added_streams); + } + + for (auto & stream : tmp_streams) if (auto * p_stream = dynamic_cast(stream.get())) { p_stream->cancel(kill); From 678ea76eef67393687aed41861a88b8cfbf68e90 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 14 Dec 2023 15:57:11 +0800 Subject: [PATCH 6/6] update Signed-off-by: gengliqi --- .../Storages/DeltaMerge/ReadThread/UnorderedInputStream.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 2bdee467848..7e3ce7d4cc8 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,9 +46,9 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } - void cancel(bool /*kill*/) override { decreaseRefCount(); } + void cancel(bool /*kill*/) override { decreaseRefCount(true); } - ~UnorderedInputStream() override { decreaseRefCount(); } + ~UnorderedInputStream() override { decreaseRefCount(false); } String getName() const override { return NAME; } @@ -65,13 +65,13 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: - void decreaseRefCount() + void decreaseRefCount(bool is_cancel) { bool ori = false; if (is_stopped.compare_exchange_strong(ori, true)) { task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->pool_id, ref_no); } }