diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index c418dcbb927..d9a265a50cd 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -33,44 +33,49 @@ class MultiPartitionStreamPool public: MultiPartitionStreamPool() = default; + void cancel(bool kill) + { + std::deque tmp_streams; + { + std::unique_lock lk(mu); + if (is_cancelled) + return; + + is_cancelled = true; + tmp_streams.swap(added_streams); + } + + for (auto & stream : tmp_streams) + if (auto * p_stream = dynamic_cast(stream.get())) + { + p_stream->cancel(kill); + } + } + void addPartitionStreams(const BlockInputStreams & cur_streams) { if (cur_streams.empty()) return; std::unique_lock lk(mu); - streams_queue_by_partition.push_back( - std::make_shared>>()); - for (const auto & stream : cur_streams) - streams_queue_by_partition.back()->push(stream); added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end()); } - std::shared_ptr pickOne() + BlockInputStreamPtr pickOne() { std::unique_lock lk(mu); - if (streams_queue_by_partition.empty()) + if (added_streams.empty()) return nullptr; - if (streams_queue_id >= static_cast(streams_queue_by_partition.size())) - streams_queue_id = 0; - - auto & q = *streams_queue_by_partition[streams_queue_id]; - std::shared_ptr ret = nullptr; - assert(!q.empty()); - ret = q.front(); - q.pop(); - if (q.empty()) - streams_queue_id = removeQueue(streams_queue_id); - else - streams_queue_id = nextQueueId(streams_queue_id); + + auto ret = std::move(added_streams.front()); + added_streams.pop_front(); return ret; } - int exportAddedStreams(BlockInputStreams & ret_streams) + void exportAddedStreams(BlockInputStreams & ret_streams) { std::unique_lock lk(mu); for (auto & stream : added_streams) ret_streams.push_back(stream); - return added_streams.size(); } int addedStreamsCnt() @@ -80,42 +85,8 @@ class MultiPartitionStreamPool } private: - int removeQueue(int queue_id) - { - streams_queue_by_partition[queue_id] = nullptr; - if (queue_id != static_cast(streams_queue_by_partition.size()) - 1) - { - swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back()); - streams_queue_by_partition.pop_back(); - return queue_id; - } - else - { - streams_queue_by_partition.pop_back(); - return 0; - } - } - - int nextQueueId(int queue_id) const - { - if (queue_id + 1 < static_cast(streams_queue_by_partition.size())) - return queue_id + 1; - else - return 0; - } - - static void swap(std::shared_ptr>> & a, - std::shared_ptr>> & b) - { - a.swap(b); - } - - std::vector< - std::shared_ptr>>> - streams_queue_by_partition; - std::vector> added_streams; - int streams_queue_id = 0; + std::deque added_streams; + bool is_cancelled; std::mutex mu; }; @@ -182,6 +153,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream child->cancel(kill); } } + + shared_pool->cancel(kill); } Block getHeader() const override { return children.at(0)->getHeader(); }