Skip to content

Commit

Permalink
[Bug](runtime-filter) send ignored rf when hash join build closed ear…
Browse files Browse the repository at this point in the history
…ly (#41667)

## Proposed changes
send ignored rf when hash join build closed early to avoid runtime
filter sync/merge error
Follow-up : #41292
  • Loading branch information
BiteTheDDDDt authored Oct 11, 2024
1 parent 68b5bd7 commit 8be40e2
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
9 changes: 6 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (is_ignored() || wrapper->is_ignored()) {
_context->ignored = true;
if (wrapper->is_ignored()) {
return Status::OK();
}
_context->ignored = false;

bool can_not_merge_in_or_bloom =
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
Expand All @@ -493,7 +493,10 @@ class RuntimePredicateWrapper {

switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
// try insert set
if (!_context->hybrid_set) {
_context->ignored = true;
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
Expand Down
12 changes: 8 additions & 4 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ class VRuntimeFilterSlots {
return Status::OK();
}

Status ignore_all_filters() {
for (auto filter : _runtime_filters) {
filter->set_ignored();
}
return Status::OK();
}

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter.get(), local_hash_table_size) >
state->runtime_filter_max_in_num()) {
Expand Down Expand Up @@ -141,7 +145,7 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local = false) {
Status publish(bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
Expand Down
29 changes: 17 additions & 12 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));

if (!_eos) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Base::close(state, exec_status);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly, true));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
iter->second.merge_time++;
Expand Down Expand Up @@ -234,6 +235,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options,
-1, false));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
}
Expand All @@ -252,6 +254,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options));
cnt_val->filter->set_ignored();

std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
_filter_map.emplace(filter_id, cnt_val);
Expand Down

0 comments on commit 8be40e2

Please sign in to comment.