Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8507
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
gengliqi authored and ti-chi-bot committed Dec 14, 2023
1 parent b1373b8 commit 1241207
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
51 changes: 35 additions & 16 deletions dbms/src/DataStreams/MultiplexInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,56 @@ class MultiPartitionStreamPool
public:
MultiPartitionStreamPool() = default;

void cancel(bool kill)
{
std::deque<BlockInputStreamPtr> tmp_streams;
{
std::unique_lock lk(mu);
if (is_cancelled)
return;

is_cancelled = true;
tmp_streams.swap(added_streams);
}

for (auto & stream : tmp_streams)
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->cancel(kill);
}
}

void addPartitionStreams(const BlockInputStreams & cur_streams)
{
if (cur_streams.empty())
return;
std::unique_lock lk(mu);
<<<<<<< HEAD
streams_queue_by_partition.push_back(
std::make_shared<std::queue<std::shared_ptr<IBlockInputStream>>>());
for (const auto & stream : cur_streams)
streams_queue_by_partition.back()->push(stream);
=======
>>>>>>> d344d9a872 (Process streams of partition tables one by one in MultiplexInputStream (#8507))
added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end());
}

std::shared_ptr<IBlockInputStream> pickOne()
BlockInputStreamPtr pickOne()
{
std::unique_lock lk(mu);
if (streams_queue_by_partition.empty())
if (added_streams.empty())
return nullptr;
if (streams_queue_id >= static_cast<int>(streams_queue_by_partition.size()))
streams_queue_id = 0;

auto & q = *streams_queue_by_partition[streams_queue_id];
std::shared_ptr<IBlockInputStream> ret = nullptr;
assert(!q.empty());
ret = q.front();
q.pop();
if (q.empty())
streams_queue_id = removeQueue(streams_queue_id);
else
streams_queue_id = nextQueueId(streams_queue_id);

auto ret = std::move(added_streams.front());
added_streams.pop_front();
return ret;
}

int exportAddedStreams(BlockInputStreams & ret_streams)
void exportAddedStreams(BlockInputStreams & ret_streams)
{
std::unique_lock lk(mu);
for (auto & stream : added_streams)
ret_streams.push_back(stream);
return added_streams.size();
}

int addedStreamsCnt()
Expand All @@ -80,6 +92,7 @@ class MultiPartitionStreamPool
}

private:
<<<<<<< HEAD
int removeQueue(int queue_id)
{
streams_queue_by_partition[queue_id] = nullptr;
Expand Down Expand Up @@ -116,6 +129,10 @@ class MultiPartitionStreamPool
streams_queue_by_partition;
std::vector<std::shared_ptr<IBlockInputStream>> added_streams;
int streams_queue_id = 0;
=======
std::deque<BlockInputStreamPtr> added_streams;
bool is_cancelled;
>>>>>>> d344d9a872 (Process streams of partition tables one by one in MultiplexInputStream (#8507))
std::mutex mu;
};

Expand Down Expand Up @@ -182,6 +199,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream
child->cancel(kill);
}
}

shared_pool->cancel(kill);
}

Block getHeader() const override { return children.at(0)->getHeader(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ class UnorderedInputStream : public IProfilingBlockInputStream
if (is_stopped.compare_exchange_strong(ori, true))
{
task_pool->decreaseUnorderedInputStreamRefCount();
<<<<<<< HEAD
LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->poolId(), ref_no);
=======
LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->pool_id, ref_no);
>>>>>>> d344d9a872 (Process streams of partition tables one by one in MultiplexInputStream (#8507))
}
}

Expand Down

0 comments on commit 1241207

Please sign in to comment.