From a9ca737c5ac656d5799bb0d70f96c0171faff609 Mon Sep 17 00:00:00 2001 From: Pxl Date: Fri, 11 Oct 2024 18:07:55 +0800 Subject: [PATCH] [Bug](runtime-filter) send ignored rf when hash join build closed early (#41667) send ignored rf when hash join build closed early to avoid runtime filter sync/merge error Follow-up : https://github.com/apache/doris/pull/41292 --- be/src/exprs/runtime_filter.cpp | 9 ++++-- be/src/exprs/runtime_filter_slots.h | 12 +++++--- be/src/pipeline/exec/hashjoin_build_sink.cpp | 29 ++++++++++++-------- be/src/runtime/runtime_filter_mgr.cpp | 3 ++ 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index ae0fd05bc2a1d9..1c34b5ab283f55 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -468,10 +468,10 @@ class RuntimePredicateWrapper { const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - if (is_ignored() || wrapper->is_ignored()) { - _context->ignored = true; + if (wrapper->is_ignored()) { return Status::OK(); } + _context->ignored = false; bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && @@ -489,7 +489,10 @@ class RuntimePredicateWrapper { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - // try insert set + if (!_context->hybrid_set) { + _context->ignored = true; + return Status::OK(); + } _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { _context->ignored = true; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index ebda4b56fcc30e..b6ca31c72722ec 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -98,12 +98,16 @@ class VRuntimeFilterSlots { return Status::OK(); } + Status ignore_all_filters() { + for (auto filter : _runtime_filters) { + filter->set_ignored(); + } + return Status::OK(); + } + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto* filter : _runtime_filters) { - if (filter->get_ignored()) { - continue; - } if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) { RETURN_IF_ERROR(filter->change_to_bloom_filter()); @@ -140,7 +144,7 @@ class VRuntimeFilterSlots { } // publish runtime filter - Status publish(bool publish_local = false) { + Status publish(bool publish_local) { for (auto& pair : _runtime_filters_map) { for (auto& filter : pair.second) { RETURN_IF_ERROR(filter->publish(publish_local)); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index b81e6c5f6aa957..46b70794021e82 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -135,20 +135,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { return Base::close(state, exec_status); } - auto* block = _shared_state->build_block.get(); - uint64_t hash_table_size = block ? block->rows() : 0; - { - SCOPED_TIMER(_runtime_filter_init_timer); - if (_should_build_hash_table) { - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + + if (!_eos) { + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); + } else { + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + if (_should_build_hash_table) { + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + } + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (_should_build_hash_table && hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); } - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); - } - if (_should_build_hash_table && hash_table_size > 1) { - SCOPED_TIMER(_runtime_filter_compute_timer); - _runtime_filter_slots->insert(block); } - SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); return Base::close(state, exec_status); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index bc3416d053aff8..640cece8fb3387 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -145,6 +145,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, RuntimeFilterRole::PRODUCER, -1, &merge_filter, build_bf_exactly, true)); + merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } iter->second.merge_time++; @@ -252,6 +253,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1, false)); + cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); } @@ -271,6 +273,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); + cnt_val->filter->set_ignored(); std::unique_lock guard(_filter_map_mutex); _filter_map.emplace(filter_id, cnt_val);