Skip to content

Commit

Permalink
[pick](branch-3.0) pick #41292 #41589 #41628 #41743 #41845 #41857 #41902
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 and BiteTheDDDDt authored Oct 17, 2024
1 parent e634f0d commit 9066903
Show file tree
Hide file tree
Showing 25 changed files with 167 additions and 99 deletions.
4 changes: 2 additions & 2 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,19 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
}

Status merge(BloomFilterFuncBase* bloomfilter_func) {
DCHECK(bloomfilter_func != nullptr);
DCHECK(bloomfilter_func->_bloom_filter != nullptr);
// If `_inited` is false, there is no memory allocated in bloom filter and this is the first
// call for `merge` function. So we just reuse this bloom filter, and we don't need to
// allocate memory again.
if (!_inited) {
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
DCHECK(_bloom_filter == nullptr);
DCHECK(bloomfilter_func != nullptr);
_bloom_filter = bloomfilter_func->_bloom_filter;
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_inited = true;
return Status::OK();
}
DCHECK(bloomfilter_func != nullptr);
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
return Status::InternalError(
Expand Down
13 changes: 6 additions & 7 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (is_ignored() || wrapper->is_ignored()) {
_context->ignored = true;
if (wrapper->is_ignored()) {
return Status::OK();
}
_context->ignored = false;

bool can_not_merge_in_or_bloom =
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
Expand All @@ -493,7 +493,10 @@ class RuntimePredicateWrapper {

switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
// try insert set
if (!_context->hybrid_set) {
_context->ignored = true;
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
Expand Down Expand Up @@ -1307,10 +1310,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -263,8 +262,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -365,9 +362,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -397,7 +391,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
12 changes: 8 additions & 4 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ class VRuntimeFilterSlots {
return Status::OK();
}

Status ignore_all_filters() {
for (auto filter : _runtime_filters) {
filter->set_ignored();
}
return Status::OK();
}

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter.get(), local_hash_table_size) >
state->runtime_filter_max_in_num()) {
Expand Down Expand Up @@ -141,7 +145,7 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local = false) {
Status publish(bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter.get());
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RuntimeFilterConsumer {
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark = false;
std::shared_ptr<IRuntimeFilter> runtime_filter;
pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,16 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
} else {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return true; }
bool require_shuffled_data_distribution() const override {
return !_partition_by_eq_expr_ctxs.empty() && _order_by_eq_expr_ctxs.empty();
return !_partition_by_eq_expr_ctxs.empty();
}

private:
Expand Down
42 changes: 27 additions & 15 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "exprs/bloom_filter_func.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/utils/template_helpers.hpp"

Expand Down Expand Up @@ -111,6 +112,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
}

Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table && p._shared_hashtable_controller) {
Expand All @@ -119,25 +123,30 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Status::OK();
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));

if (state->get_task()->wake_up_by_downstream()) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Status::OK();
return Base::close(state, exec_status);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
Expand Down Expand Up @@ -504,6 +513,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -555,6 +565,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -567,7 +578,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
_shared_hashtable_controller->signal(node_id());
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand All @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final
std::shared_ptr<BasicSharedState> create_shared_state() const override;

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class PipelineXSinkLocalStateBase {
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed = false;
std::atomic<bool> _eos = false;
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
Expand Down Expand Up @@ -514,6 +515,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};
Expand Down
36 changes: 13 additions & 23 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_use_global_shuffle = should_disable_bucket_shuffle;
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
Expand Down Expand Up @@ -84,6 +85,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
_profile->add_info_string(
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
_channel_id = info.task_idx;
return Status::OK();
}
Expand All @@ -104,30 +110,16 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
RETURN_IF_ERROR(Base::close(state, exec_status));
if (exec_status.ok()) {
DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0)
<< " state: { cancel = " << state->is_cancelled() << ", "
<< state->cancel_reason().to_string() << "} query ctx: { cancel = "
<< state->get_query_ctx()->is_cancelled() << ", "
<< state->get_query_ctx()->exec_status().to_string() << "}";
}
return Status::OK();
}

std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_release_count);
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
"_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}",
Base::debug_string(indentation_level),
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
return fmt::to_string(debug_string_buffer);
}

Expand All @@ -140,13 +132,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

// If all exchange sources ended due to limit reached, current task should also finish
if (local_state._exchanger->_running_source_operators == 0) {
local_state._release_count = true;
local_state._shared_state->sub_running_sink_operators();
return Status::EndOfFile("receiver eof");
}
if (eos) {
local_state._shared_state->sub_running_sink_operators();
local_state._release_count = true;
}

return Status::OK();
Expand Down
Loading

0 comments on commit 9066903

Please sign in to comment.