Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8507
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
gengliqi authored and ti-chi-bot committed Dec 14, 2023
1 parent ccd0b84 commit e9d8180
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 53 deletions.
82 changes: 29 additions & 53 deletions dbms/src/DataStreams/MultiplexInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,49 @@ class MultiPartitionStreamPool
public:
MultiPartitionStreamPool() = default;

void cancel(bool kill)
{
std::deque<BlockInputStreamPtr> tmp_streams;
{
std::unique_lock lk(mu);
if (is_cancelled)
return;

is_cancelled = true;
tmp_streams.swap(added_streams);
}

for (auto & stream : tmp_streams)
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->cancel(kill);
}
}

void addPartitionStreams(const BlockInputStreams & cur_streams)
{
if (cur_streams.empty())
return;
std::unique_lock lk(mu);
streams_queue_by_partition.push_back(std::make_shared<std::queue<std::shared_ptr<IBlockInputStream>>>());
for (const auto & stream : cur_streams)
streams_queue_by_partition.back()->push(stream);
added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end());
}

std::shared_ptr<IBlockInputStream> pickOne()
BlockInputStreamPtr pickOne()
{
std::unique_lock lk(mu);
if (streams_queue_by_partition.empty())
if (added_streams.empty())
return nullptr;
if (streams_queue_id >= static_cast<int>(streams_queue_by_partition.size()))
streams_queue_id = 0;

auto & q = *streams_queue_by_partition[streams_queue_id];
std::shared_ptr<IBlockInputStream> ret = nullptr;
assert(!q.empty());
ret = q.front();
q.pop();
if (q.empty())
streams_queue_id = removeQueue(streams_queue_id);
else
streams_queue_id = nextQueueId(streams_queue_id);

auto ret = std::move(added_streams.front());
added_streams.pop_front();
return ret;
}

int exportAddedStreams(BlockInputStreams & ret_streams)
void exportAddedStreams(BlockInputStreams & ret_streams)
{
std::unique_lock lk(mu);
for (auto & stream : added_streams)
ret_streams.push_back(stream);
return added_streams.size();
}

int addedStreamsCnt()
Expand All @@ -79,40 +85,8 @@ class MultiPartitionStreamPool
}

private:
int removeQueue(int queue_id)
{
streams_queue_by_partition[queue_id] = nullptr;
if (queue_id != static_cast<int>(streams_queue_by_partition.size()) - 1)
{
swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back());
streams_queue_by_partition.pop_back();
return queue_id;
}
else
{
streams_queue_by_partition.pop_back();
return 0;
}
}

int nextQueueId(int queue_id) const
{
if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size()))
return queue_id + 1;
else
return 0;
}

static void swap(
std::shared_ptr<std::queue<std::shared_ptr<IBlockInputStream>>> & a,
std::shared_ptr<std::queue<std::shared_ptr<IBlockInputStream>>> & b)
{
a.swap(b);
}

std::vector<std::shared_ptr<std::queue<std::shared_ptr<IBlockInputStream>>>> streams_queue_by_partition;
std::vector<std::shared_ptr<IBlockInputStream>> added_streams;
int streams_queue_id = 0;
std::deque<BlockInputStreamPtr> added_streams;
bool is_cancelled;
std::mutex mu;
};

Expand Down Expand Up @@ -170,6 +144,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream
child->cancel(kill);
}
}

shared_pool->cancel(kill);
}

Block getHeader() const override { return children.at(0)->getHeader(); }
Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ class UnorderedInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}

<<<<<<< HEAD
~UnorderedInputStream() override
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}
=======
void cancel(bool /*kill*/) override { decreaseRefCount(true); }

~UnorderedInputStream() override { decreaseRefCount(false); }
>>>>>>> d344d9a872 (Process streams of partition tables one by one in MultiplexInputStream (#8507))

String getName() const override { return NAME; }

Expand All @@ -67,6 +73,19 @@ class UnorderedInputStream : public IProfilingBlockInputStream
}

protected:
<<<<<<< HEAD
=======
void decreaseRefCount(bool is_cancel)
{
bool ori = false;
if (is_stopped.compare_exchange_strong(ori, true))
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->pool_id, ref_no);
}
}

>>>>>>> d344d9a872 (Process streams of partition tables one by one in MultiplexInputStream (#8507))
Block readImpl() override
{
FilterPtr filter_ignored;
Expand Down

0 comments on commit e9d8180

Please sign in to comment.