Skip to content

Commit

Permalink
[Bug](runtime-filter) send ignored rf when hash join build closed ear…
Browse files Browse the repository at this point in the history
…ly (apache#41667)

send ignored rf when hash join build closed early to avoid runtime
filter sync/merge error
Follow-up : apache#41292
  • Loading branch information
BiteTheDDDDt authored and Gabriel39 committed Oct 16, 2024
1 parent 0317940 commit a9ca737
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
9 changes: 6 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,10 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (is_ignored() || wrapper->is_ignored()) {
_context->ignored = true;
if (wrapper->is_ignored()) {
return Status::OK();
}
_context->ignored = false;

bool can_not_merge_in_or_bloom =
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
Expand All @@ -489,7 +489,10 @@ class RuntimePredicateWrapper {

switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
// try insert set
if (!_context->hybrid_set) {
_context->ignored = true;
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
Expand Down
12 changes: 8 additions & 4 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ class VRuntimeFilterSlots {
return Status::OK();
}

Status ignore_all_filters() {
for (auto filter : _runtime_filters) {
filter->set_ignored();
}
return Status::OK();
}

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
Expand Down Expand Up @@ -140,7 +144,7 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local = false) {
Status publish(bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
Expand Down
29 changes: 17 additions & 12 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));

if (!_eos) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Base::close(state, exec_status);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly, true));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
iter->second.merge_time++;
Expand Down Expand Up @@ -252,6 +253,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options,
-1, false));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
}
Expand All @@ -271,6 +273,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc));
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options));
cnt_val->filter->set_ignored();

std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
_filter_map.emplace(filter_id, cnt_val);
Expand Down

0 comments on commit a9ca737

Please sign in to comment.