Skip to content

Commit

Permalink
fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed May 5, 2023
1 parent 5243326 commit 429db75
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 45 deletions.
80 changes: 54 additions & 26 deletions src/Processors/Transforms/InnerShuffleTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Processors/Port.h>
#include <Processors/Transforms/InnerShuffleTransform.h>
#include <Common/WeakHash.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include "InnerShuffleTransform.h"

Expand Down Expand Up @@ -71,6 +72,11 @@ IProcessor::Status InnerShuffleScatterTransform::prepare()
}
return Status::PortFull;
}
// There is pending chunk in output port which is not pulled out.
if (!output.canPush())
{
return Status::PortFull;
}
if (input.isFinished())
{
output.finish();
Expand Down Expand Up @@ -149,21 +155,16 @@ InnerShuffleDispatchTransform::InnerShuffleDispatchTransform(size_t input_nums_,
IProcessor::Status InnerShuffleDispatchTransform::prepare()
{
// If there is any output port is finished, make it finished.
bool is_output_finished = false;
bool is_output_finished = true;
for (auto & iter : outputs)
{
if (iter.isFinished())
if (!iter.isFinished())
{
is_output_finished = true;
break;
is_output_finished = false;
}
}
if (is_output_finished)
{
for (auto & iter : outputs)
{
iter.finish();
}
for (auto & iter : inputs)
{
iter.close();
Expand All @@ -182,18 +183,25 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare()
size_t i = 0;
for (auto & outport : outputs)
{
if (outport.canPush())
if (outport.isFinished())
{
output_chunks[i].clear();
i += 1;
continue;
}
if (!output_chunks[i].empty())
{
if (!output_chunks[i].empty())
if (outport.canPush())
{
has_pending_chunks = true;
Chunk tmp_chunk;
tmp_chunk.swap(output_chunks[i].front());
output_chunks[i].pop_front();
outport.push(std::move(tmp_chunk));
has_chunks_out = true;
}
}
if (!outport.canPush())
has_pending_chunks = true;
i += 1;
}
// If there is no output port available, return PortFull
Expand All @@ -212,14 +220,17 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare()
input.setNeeded();
if (input.hasData())
{
input_chunks.emplace_back(input.pull(true));
auto chunk = input.pull(true);
input_chunks.emplace_back(std::move(chunk));
has_input = true;
}
}
if (all_input_finished)
{
for (auto & outport : outputs)
{
if (!outport.isFinished() && !outport.canPush())
return Status::PortFull;
outport.finish();
}
return Status::Finished;
Expand All @@ -243,26 +254,39 @@ void InnerShuffleDispatchTransform::work()
size_t i = 0;
for (const auto & chunk : chunk_list->chunks)
{
if (output_chunks[i].empty() || output_chunks[i].back().getNumRows() >= DEFAULT_BLOCK_SIZE)
auto num_rows = chunk.getNumRows();
if (num_rows)
{
Chunk new_chunk(chunk.getColumns(), chunk.getNumRows());
output_chunks[i].push_back(std::move(new_chunk));
}
else
{
auto & last_chunk = output_chunks[i].back();
auto src_cols = chunk.getColumns();
auto dst_cols = last_chunk.mutateColumns();
for (size_t n = 0; n < src_cols.size(); ++n)
if (output_chunks[i].empty() || output_chunks[i].back().getNumRows() >= DEFAULT_BLOCK_SIZE)
{
Columns columns;
columns.reserve(chunk.getColumns().size());
for (const auto & col : chunk.getColumns())
{
columns.emplace_back(recursiveRemoveSparse(col));
}
Chunk new_chunk(columns, num_rows);
output_chunks[i].push_back(std::move(new_chunk));
}
else
{
dst_cols[n]->insertRangeFrom(*src_cols[n], 0, src_cols[n]->size());
auto & last_chunk = output_chunks[i].back();
auto src_cols = chunk.getColumns();
auto dst_cols = last_chunk.mutateColumns();
for (size_t n = 0; n < src_cols.size(); ++n)
{
auto src_col = recursiveRemoveSparse(src_cols[n]);
dst_cols[n]->insertRangeFrom(*src_col, 0, src_col->size());
}
auto rows = dst_cols[0]->size();
Chunk new_chunk(std::move(dst_cols), rows);
output_chunks[i].back().swap(new_chunk);
}
auto rows = dst_cols[0]->size();
output_chunks[i].back().setColumns(std::move(dst_cols), rows);
}
i += 1;
}
}
input_chunks.clear();
has_input = false;
}

Expand Down Expand Up @@ -298,6 +322,10 @@ IProcessor::Status InnerShuffleGatherTransform::prepare()
}
return Status::PortFull;
}
if (!outputs.front().canPush())
{
return Status::PortFull;
}
size_t i = 0;
bool all_input_finished = true;
while (i < inputs.size())
Expand Down Expand Up @@ -333,6 +361,6 @@ IProcessor::Status InnerShuffleGatherTransform::prepare()
void InnerShuffleGatherTransform::work()
{
has_input = false;
has_output = true;
has_output = output_chunk.getNumRows() > 0;
}
}
40 changes: 21 additions & 19 deletions tests/queries/0_stateless/02236_explain_pipeline_join.reference
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@ ExpressionTransform
(Join)
Resize 16 → 1
JoiningTransform × 16 2 → 1
InnerShuffleGatherTransform × 16 16 → 1
InnerShuffleScatterTransform × 16 1 → 16
Resize 1 → 16
(Expression)
ExpressionTransform
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1
(Expression)
FillingRightJoinSide × 16
InnerShuffleGatherTransform × 16 16 → 1
InnerShuffleScatterTransform × 16 1 → 16
Resize 1 → 16
ExpressionTransform
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1
InnerShuffleGatherTransform × 16 8 → 1
InnerShuffleDispatchTransform × 8 2 → 16
InnerShuffleScatterTransform × 16
Resize 1 → 16
(Expression)
ExpressionTransform
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1
(Expression)
FillingRightJoinSide × 16
InnerShuffleGatherTransform × 16 8 → 1
InnerShuffleDispatchTransform × 8 2 → 16
InnerShuffleScatterTransform × 16
Resize 1 → 16
ExpressionTransform
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1

0 comments on commit 429db75

Please sign in to comment.