Skip to content

Commit

Permalink
Merge branch 'master' into 20240920_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Oct 11, 2024
2 parents c6f5d97 + 0d07e3d commit 3986e29
Show file tree
Hide file tree
Showing 182 changed files with 6,646 additions and 4,251 deletions.
15 changes: 8 additions & 7 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (is_ignored() || wrapper->is_ignored()) {
_context->ignored = true;
if (wrapper->is_ignored()) {
return Status::OK();
}
_context->ignored = false;

bool can_not_merge_in_or_bloom =
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
Expand All @@ -493,7 +493,10 @@ class RuntimePredicateWrapper {

switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
// try insert set
if (!_context->hybrid_set) {
_context->ignored = true;
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
Expand Down Expand Up @@ -1144,6 +1147,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
request->set_filter_size(local_filter_size);
request->set_filter_id(_filter_id);
callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000);
callback->cntl_->ignore_eovercrowded();

stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(),
closure.get());
Expand Down Expand Up @@ -1181,6 +1185,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
auto column_type = _wrapper->column_type();
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
merge_filter_callback->cntl_->ignore_eovercrowded();

if (get_ignored()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
Expand Down Expand Up @@ -1307,10 +1312,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -264,8 +263,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -366,9 +363,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -398,7 +392,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
12 changes: 8 additions & 4 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ class VRuntimeFilterSlots {
return Status::OK();
}

Status ignore_all_filters() {
for (auto filter : _runtime_filters) {
filter->set_ignored();
}
return Status::OK();
}

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter.get(), local_hash_table_size) >
state->runtime_filter_max_in_num()) {
Expand Down Expand Up @@ -141,7 +145,7 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local = false) {
Status publish(bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct PartialUpdateInfo {
case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
return "flexible partial update";
}
return "";
}
bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; }
bool is_fixed_partial_update() const {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter.get());
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RuntimeFilterConsumer {
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark = false;
std::shared_ptr<IRuntimeFilter> runtime_filter;
pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
Expand Down
33 changes: 20 additions & 13 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));

if (!_eos) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Base::close(state, exec_status);
Expand Down Expand Up @@ -590,6 +595,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -601,7 +607,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand All @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final
std::shared_ptr<BasicSharedState> create_shared_state() const override;

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
}

void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}

_sink.reset(new MultiCastDataStreamSinkOperatorX(
sink_id, sources, cast_set<int>(thrift_sink.multi_cast_stream_sink.sinks.size()),
pool, thrift_sink.multi_cast_stream_sink, row_desc));
sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
Expand Down
23 changes: 17 additions & 6 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
return true;
return !_wake_up_by_downstream;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand All @@ -249,7 +249,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
// If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
Expand All @@ -268,7 +268,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down Expand Up @@ -306,6 +306,11 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
RETURN_IF_ERROR(_open());
Expand All @@ -315,6 +320,11 @@ Status PipelineTask::execute(bool* eos) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand Down Expand Up @@ -481,9 +491,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PipelineTask {
_blocked_dep = fin_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly, true));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
iter->second.merge_time++;
Expand Down Expand Up @@ -234,6 +235,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options,
-1, false));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
}
Expand All @@ -252,6 +254,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options));
cnt_val->filter->set_ignored();

std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
_filter_map.emplace(filter_id, cnt_val);
Expand Down Expand Up @@ -342,6 +345,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);
closure->cntl_->ignore_eovercrowded();

closure->request_->set_filter_id(filter_id);
closure->request_->set_filter_size(cnt_val->global_size);
Expand Down Expand Up @@ -454,6 +458,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
closure->cntl_->request_attachment().append(request_attachment);
}
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);
closure->cntl_->ignore_eovercrowded();
// set fragment-id
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
Expand Down
Loading

0 comments on commit 3986e29

Please sign in to comment.