Skip to content

Commit

Permalink
[pick](branch-2.1) pick apache#41555 apache#41592 apache#38204 (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 14, 2024
1 parent ec0c008 commit f112af0
Show file tree
Hide file tree
Showing 21 changed files with 174 additions and 38 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ class OperatorBase {

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
Expand All @@ -268,7 +271,7 @@ class OperatorBase {
OperatorXPtr _child_x = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase {

[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
Expand Down Expand Up @@ -627,8 +625,6 @@ class DataSinkOperatorXBase : public OperatorBase {
: DataDistribution(ExchangeType::NOOP);
}

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Expand Down
43 changes: 23 additions & 20 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs,
OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe,
int child_idx, const bool followed_by_shuffled_join) {
int child_idx, const bool followed_by_shuffled_operator) {
// propagate error case
if (*node_idx >= tnodes.size()) {
// TODO: print thrift msg
Expand All @@ -782,11 +782,11 @@ Status PipelineXFragmentContext::_create_tree_helper(
const TPlanNode& tnode = tnodes[*node_idx];

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
OperatorXPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_join));
followed_by_shuffled_operator));

// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
Expand All @@ -797,7 +797,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
}

/**
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
*
* For plan:
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
Expand All @@ -811,8 +811,8 @@ Status PipelineXFragmentContext::_create_tree_helper(
cur_pipe->operator_xs().empty()
? cur_pipe->sink_x()->require_shuffled_data_distribution()
: op->require_shuffled_data_distribution();
current_followed_by_shuffled_join =
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
current_followed_by_shuffled_operator =
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;

cur_pipe->_name.push_back('-');
Expand All @@ -823,7 +823,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
cur_pipe, i, current_followed_by_shuffled_join));
cur_pipe, i, current_followed_by_shuffled_operator));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -865,13 +865,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join =
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join()
: cur_pipe->sink_x()->followed_by_shuffled_join();
const bool followed_by_shuffled_operator =
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator()
: cur_pipe->sink_x()->followed_by_shuffled_operator();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
followed_by_shuffled_join;
followed_by_shuffled_operator;
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id,
should_disable_bucket_shuffle ? _total_instances : _num_instances,
Expand Down Expand Up @@ -1047,7 +1047,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
Expand Down Expand Up @@ -1121,7 +1121,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
Expand Down Expand Up @@ -1152,7 +1152,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1203,8 +1203,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand All @@ -1225,8 +1225,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
Expand Down Expand Up @@ -1256,6 +1256,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op));

const auto downstream_pipeline_id = cur_pipe->id();
Expand Down Expand Up @@ -1298,7 +1299,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1338,7 +1339,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand All @@ -1349,11 +1350,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down
28 changes: 26 additions & 2 deletions be/src/vec/functions/function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct NameQuoteImpl {
}
};

struct NameStringLenght {
struct NameStringLength {
static constexpr auto name = "length";
};

Expand All @@ -104,6 +104,28 @@ struct StringLengthImpl {
}
};

struct NameCrc32 {
static constexpr auto name = "crc32";
};

struct Crc32Impl {
using ReturnType = DataTypeInt64;
static constexpr auto TYPE_INDEX = TypeIndex::String;
using Type = String;
using ReturnColumnType = ColumnVector<Int64>;

static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
PaddedPODArray<Int64>& res) {
auto size = offsets.size();
res.resize(size);
for (int i = 0; i < size; ++i) {
res[i] = crc32_z(0L, (const unsigned char*)data.data() + offsets[i - 1],
offsets[i] - offsets[i - 1]);
}
return Status::OK();
}
};

struct NameStringUtf8Length {
static constexpr auto name = "char_length";
};
Expand Down Expand Up @@ -1073,7 +1095,8 @@ using StringFindInSetImpl = StringFunctionImpl<LeftDataType, RightDataType, Find

// ready for regist function
using FunctionStringASCII = FunctionUnaryToType<StringASCII, NameStringASCII>;
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLenght>;
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLength>;
using FunctionCrc32 = FunctionUnaryToType<Crc32Impl, NameCrc32>;
using FunctionStringUTF8Length = FunctionUnaryToType<StringUtf8LengthImpl, NameStringUtf8Length>;
using FunctionStringSpace = FunctionUnaryToType<StringSpace, NameStringSpace>;
using FunctionStringStartsWith =
Expand Down Expand Up @@ -1111,6 +1134,7 @@ using FunctionStringRPad = FunctionStringPad<StringRPad>;
void register_function_string(SimpleFunctionFactory& factory) {
factory.register_function<FunctionStringASCII>();
factory.register_function<FunctionStringLength>();
factory.register_function<FunctionCrc32>();
factory.register_function<FunctionStringUTF8Length>();
factory.register_function<FunctionStringSpace>();
factory.register_function<FunctionStringStartsWith>();
Expand Down
Loading

0 comments on commit f112af0

Please sign in to comment.