Skip to content

Commit

Permalink
Process streams of partition tables one by one in MultiplexInputStream (
Browse files Browse the repository at this point in the history
#8507) (#8522)

close #8505
  • Loading branch information
ti-chi-bot authored Dec 15, 2023
1 parent 7af667a commit e4a30bc
Showing 1 changed file with 29 additions and 56 deletions.
85 changes: 29 additions & 56 deletions dbms/src/DataStreams/MultiplexInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,49 @@ class MultiPartitionStreamPool
public:
MultiPartitionStreamPool() = default;

void cancel(bool kill)
{
std::deque<BlockInputStreamPtr> tmp_streams;
{
std::unique_lock lk(mu);
if (is_cancelled)
return;

is_cancelled = true;
tmp_streams.swap(added_streams);
}

for (auto & stream : tmp_streams)
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->cancel(kill);
}
}

void addPartitionStreams(const BlockInputStreams & cur_streams)
{
if (cur_streams.empty())
return;
std::unique_lock lk(mu);
streams_queue_by_partition.push_back(
std::make_shared<std::queue<std::shared_ptr<IBlockInputStream>>>());
for (const auto & stream : cur_streams)
streams_queue_by_partition.back()->push(stream);
added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end());
}

std::shared_ptr<IBlockInputStream> pickOne()
BlockInputStreamPtr pickOne()
{
std::unique_lock lk(mu);
if (streams_queue_by_partition.empty())
if (added_streams.empty())
return nullptr;
if (streams_queue_id >= static_cast<int>(streams_queue_by_partition.size()))
streams_queue_id = 0;

auto & q = *streams_queue_by_partition[streams_queue_id];
std::shared_ptr<IBlockInputStream> ret = nullptr;
assert(!q.empty());
ret = q.front();
q.pop();
if (q.empty())
streams_queue_id = removeQueue(streams_queue_id);
else
streams_queue_id = nextQueueId(streams_queue_id);

auto ret = std::move(added_streams.front());
added_streams.pop_front();
return ret;
}

int exportAddedStreams(BlockInputStreams & ret_streams)
void exportAddedStreams(BlockInputStreams & ret_streams)
{
std::unique_lock lk(mu);
for (auto & stream : added_streams)
ret_streams.push_back(stream);
return added_streams.size();
}

int addedStreamsCnt()
Expand All @@ -80,42 +85,8 @@ class MultiPartitionStreamPool
}

private:
int removeQueue(int queue_id)
{
streams_queue_by_partition[queue_id] = nullptr;
if (queue_id != static_cast<int>(streams_queue_by_partition.size()) - 1)
{
swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back());
streams_queue_by_partition.pop_back();
return queue_id;
}
else
{
streams_queue_by_partition.pop_back();
return 0;
}
}

int nextQueueId(int queue_id) const
{
if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size()))
return queue_id + 1;
else
return 0;
}

static void swap(std::shared_ptr<std::queue<std::shared_ptr<IBlockInputStream>>> & a,
std::shared_ptr<std::queue<std::shared_ptr<IBlockInputStream>>> & b)
{
a.swap(b);
}

std::vector<
std::shared_ptr<std::queue<
std::shared_ptr<IBlockInputStream>>>>
streams_queue_by_partition;
std::vector<std::shared_ptr<IBlockInputStream>> added_streams;
int streams_queue_id = 0;
std::deque<BlockInputStreamPtr> added_streams;
bool is_cancelled;
std::mutex mu;
};

Expand Down Expand Up @@ -182,6 +153,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream
child->cancel(kill);
}
}

shared_pool->cancel(kill);
}

Block getHeader() const override { return children.at(0)->getHeader(); }
Expand Down

0 comments on commit e4a30bc

Please sign in to comment.