Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pick](branch-2.1) pick #41555 #41592 #38204 #41781

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ class OperatorBase {

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
Expand All @@ -268,7 +271,7 @@ class OperatorBase {
OperatorXPtr _child_x = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase {

[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
Expand Down Expand Up @@ -627,8 +625,6 @@ class DataSinkOperatorXBase : public OperatorBase {
: DataDistribution(ExchangeType::NOOP);
}

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Expand Down
43 changes: 23 additions & 20 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs,
OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe,
int child_idx, const bool followed_by_shuffled_join) {
int child_idx, const bool followed_by_shuffled_operator) {
// propagate error case
if (*node_idx >= tnodes.size()) {
// TODO: print thrift msg
Expand All @@ -782,11 +782,11 @@ Status PipelineXFragmentContext::_create_tree_helper(
const TPlanNode& tnode = tnodes[*node_idx];

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
OperatorXPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_join));
followed_by_shuffled_operator));

// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
Expand All @@ -797,7 +797,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
}

/**
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
*
* For plan:
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
Expand All @@ -811,8 +811,8 @@ Status PipelineXFragmentContext::_create_tree_helper(
cur_pipe->operator_xs().empty()
? cur_pipe->sink_x()->require_shuffled_data_distribution()
: op->require_shuffled_data_distribution();
current_followed_by_shuffled_join =
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
current_followed_by_shuffled_operator =
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;

cur_pipe->_name.push_back('-');
Expand All @@ -823,7 +823,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
cur_pipe, i, current_followed_by_shuffled_join));
cur_pipe, i, current_followed_by_shuffled_operator));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -865,13 +865,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join =
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join()
: cur_pipe->sink_x()->followed_by_shuffled_join();
const bool followed_by_shuffled_operator =
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator()
: cur_pipe->sink_x()->followed_by_shuffled_operator();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
followed_by_shuffled_join;
followed_by_shuffled_operator;
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id,
should_disable_bucket_shuffle ? _total_instances : _num_instances,
Expand Down Expand Up @@ -1047,7 +1047,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
Expand Down Expand Up @@ -1121,7 +1121,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
Expand Down Expand Up @@ -1152,7 +1152,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1203,8 +1203,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand All @@ -1225,8 +1225,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
Expand Down Expand Up @@ -1256,6 +1256,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op));

const auto downstream_pipeline_id = cur_pipe->id();
Expand Down Expand Up @@ -1298,7 +1299,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1338,7 +1339,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand All @@ -1349,11 +1350,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down
28 changes: 26 additions & 2 deletions be/src/vec/functions/function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct NameQuoteImpl {
}
};

struct NameStringLenght {
struct NameStringLength {
static constexpr auto name = "length";
};

Expand All @@ -104,6 +104,28 @@ struct StringLengthImpl {
}
};

struct NameCrc32 {
static constexpr auto name = "crc32";
};

struct Crc32Impl {
using ReturnType = DataTypeInt64;
static constexpr auto TYPE_INDEX = TypeIndex::String;
using Type = String;
using ReturnColumnType = ColumnVector<Int64>;

static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
PaddedPODArray<Int64>& res) {
auto size = offsets.size();
res.resize(size);
for (int i = 0; i < size; ++i) {
res[i] = crc32_z(0L, (const unsigned char*)data.data() + offsets[i - 1],
offsets[i] - offsets[i - 1]);
}
return Status::OK();
}
};

struct NameStringUtf8Length {
static constexpr auto name = "char_length";
};
Expand Down Expand Up @@ -1073,7 +1095,8 @@ using StringFindInSetImpl = StringFunctionImpl<LeftDataType, RightDataType, Find

// ready for regist function
using FunctionStringASCII = FunctionUnaryToType<StringASCII, NameStringASCII>;
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLenght>;
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLength>;
using FunctionCrc32 = FunctionUnaryToType<Crc32Impl, NameCrc32>;
using FunctionStringUTF8Length = FunctionUnaryToType<StringUtf8LengthImpl, NameStringUtf8Length>;
using FunctionStringSpace = FunctionUnaryToType<StringSpace, NameStringSpace>;
using FunctionStringStartsWith =
Expand Down Expand Up @@ -1111,6 +1134,7 @@ using FunctionStringRPad = FunctionStringPad<StringRPad>;
void register_function_string(SimpleFunctionFactory& factory) {
factory.register_function<FunctionStringASCII>();
factory.register_function<FunctionStringLength>();
factory.register_function<FunctionCrc32>();
factory.register_function<FunctionStringUTF8Length>();
factory.register_function<FunctionStringSpace>();
factory.register_function<FunctionStringStartsWith>();
Expand Down
Loading
Loading