diff --git a/src/Processors/Transforms/InnerShuffleTransform.cpp b/src/Processors/Transforms/InnerShuffleTransform.cpp index 8d481b4d35cc..1cf2b3c428ad 100644 --- a/src/Processors/Transforms/InnerShuffleTransform.cpp +++ b/src/Processors/Transforms/InnerShuffleTransform.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "InnerShuffleTransform.h" @@ -71,6 +72,11 @@ IProcessor::Status InnerShuffleScatterTransform::prepare() } return Status::PortFull; } + // There is pending chunk in output port which is not pulled out. + if (!output.canPush()) + { + return Status::PortFull; + } if (input.isFinished()) { output.finish(); @@ -149,21 +155,16 @@ InnerShuffleDispatchTransform::InnerShuffleDispatchTransform(size_t input_nums_, IProcessor::Status InnerShuffleDispatchTransform::prepare() { // If there is any output port is finished, make it finished. - bool is_output_finished = false; + bool is_output_finished = true; for (auto & iter : outputs) { - if (iter.isFinished()) + if (!iter.isFinished()) { - is_output_finished = true; - break; + is_output_finished = false; } } if (is_output_finished) { - for (auto & iter : outputs) - { - iter.finish(); - } for (auto & iter : inputs) { iter.close(); @@ -182,11 +183,16 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() size_t i = 0; for (auto & outport : outputs) { - if (outport.canPush()) + if (outport.isFinished()) + { + output_chunks[i].clear(); + i += 1; + continue; + } + if (!output_chunks[i].empty()) { - if (!output_chunks[i].empty()) + if (outport.canPush()) { - has_pending_chunks = true; Chunk tmp_chunk; tmp_chunk.swap(output_chunks[i].front()); output_chunks[i].pop_front(); @@ -194,6 +200,8 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() has_chunks_out = true; } } + if (!outport.canPush()) + has_pending_chunks = true; i += 1; } // If there is no output port available, return PortFull @@ -212,7 +220,8 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() input.setNeeded(); if (input.hasData()) { - input_chunks.emplace_back(input.pull(true)); + auto chunk = input.pull(true); + input_chunks.emplace_back(std::move(chunk)); has_input = true; } } @@ -220,6 +229,8 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() { for (auto & outport : outputs) { + if (!outport.isFinished() && !outport.canPush()) + return Status::PortFull; outport.finish(); } return Status::Finished; @@ -243,26 +254,39 @@ void InnerShuffleDispatchTransform::work() size_t i = 0; for (const auto & chunk : chunk_list->chunks) { - if (output_chunks[i].empty() || output_chunks[i].back().getNumRows() >= DEFAULT_BLOCK_SIZE) + auto num_rows = chunk.getNumRows(); + if (num_rows) { - Chunk new_chunk(chunk.getColumns(), chunk.getNumRows()); - output_chunks[i].push_back(std::move(new_chunk)); - } - else - { - auto & last_chunk = output_chunks[i].back(); - auto src_cols = chunk.getColumns(); - auto dst_cols = last_chunk.mutateColumns(); - for (size_t n = 0; n < src_cols.size(); ++n) + if (output_chunks[i].empty() || output_chunks[i].back().getNumRows() >= DEFAULT_BLOCK_SIZE) + { + Columns columns; + columns.reserve(chunk.getColumns().size()); + for (const auto & col : chunk.getColumns()) + { + columns.emplace_back(recursiveRemoveSparse(col)); + } + Chunk new_chunk(columns, num_rows); + output_chunks[i].push_back(std::move(new_chunk)); + } + else { - dst_cols[n]->insertRangeFrom(*src_cols[n], 0, src_cols[n]->size()); + auto & last_chunk = output_chunks[i].back(); + auto src_cols = chunk.getColumns(); + auto dst_cols = last_chunk.mutateColumns(); + for (size_t n = 0; n < src_cols.size(); ++n) + { + auto src_col = recursiveRemoveSparse(src_cols[n]); + dst_cols[n]->insertRangeFrom(*src_col, 0, src_col->size()); + } + auto rows = dst_cols[0]->size(); + Chunk new_chunk(std::move(dst_cols), rows); + output_chunks[i].back().swap(new_chunk); } - auto rows = dst_cols[0]->size(); - output_chunks[i].back().setColumns(std::move(dst_cols), rows); } i += 1; } } + input_chunks.clear(); has_input = false; } @@ -298,6 +322,10 @@ IProcessor::Status InnerShuffleGatherTransform::prepare() } return Status::PortFull; } + if (!outputs.front().canPush()) + { + return Status::PortFull; + } size_t i = 0; bool all_input_finished = true; while (i < inputs.size()) @@ -333,6 +361,6 @@ IProcessor::Status InnerShuffleGatherTransform::prepare() void InnerShuffleGatherTransform::work() { has_input = false; - has_output = true; + has_output = output_chunk.getNumRows() > 0; } } diff --git a/tests/queries/0_stateless/02236_explain_pipeline_join.reference b/tests/queries/0_stateless/02236_explain_pipeline_join.reference index 8b47622e90e7..18e5db7aa4ff 100644 --- a/tests/queries/0_stateless/02236_explain_pipeline_join.reference +++ b/tests/queries/0_stateless/02236_explain_pipeline_join.reference @@ -3,22 +3,24 @@ ExpressionTransform (Join) Resize 16 → 1 JoiningTransform × 16 2 → 1 - InnerShuffleGatherTransform × 16 16 → 1 - InnerShuffleScatterTransform × 16 1 → 16 - Resize 1 → 16 - (Expression) - ExpressionTransform - (Limit) - Limit - (ReadFromStorage) - Numbers 0 → 1 - (Expression) - FillingRightJoinSide × 16 - InnerShuffleGatherTransform × 16 16 → 1 - InnerShuffleScatterTransform × 16 1 → 16 - Resize 1 → 16 - ExpressionTransform - (Limit) - Limit - (ReadFromStorage) - Numbers 0 → 1 + InnerShuffleGatherTransform × 16 8 → 1 + InnerShuffleDispatchTransform × 8 2 → 16 + InnerShuffleScatterTransform × 16 + Resize 1 → 16 + (Expression) + ExpressionTransform + (Limit) + Limit + (ReadFromStorage) + Numbers 0 → 1 + (Expression) + FillingRightJoinSide × 16 + InnerShuffleGatherTransform × 16 8 → 1 + InnerShuffleDispatchTransform × 8 2 → 16 + InnerShuffleScatterTransform × 16 + Resize 1 → 16 + ExpressionTransform + (Limit) + Limit + (ReadFromStorage) + Numbers 0 → 1