Skip to content

Commit

Permalink
[pipelineX](runtime filter) Fix task timeout caused by runtime filter (
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Apr 8, 2024
1 parent d60d804 commit a8232c6
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 214 deletions.
23 changes: 20 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", _format_status());
_profile->add_info_string("Info", formatted_state());
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT);
Expand Down Expand Up @@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
return true;
}

void IRuntimeFilter::update_state() {
DCHECK(is_consumer());
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
DCHECK(_enable_pipeline_exec);
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}

// NOTE: Wait infinitely will not make scan task wait really forever.
// Because BlockTaskSchedule will make it run when query is timedout.
bool IRuntimeFilter::wait_infinitely() const {
Expand Down Expand Up @@ -1236,7 +1253,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
_wrapper->_ignored_msg = msg;
}

std::string IRuntimeFilter::_format_status() const {
std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}]",
Expand Down Expand Up @@ -1411,7 +1428,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
} else {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
_profile->add_info_string("Info", formatted_state());
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class IRuntimeFilter {
// This function will wait at most config::runtime_filter_shuffle_wait_time_ms
// if return true , filter is ready to use
bool await();
void update_state();
// this function will be called if a runtime filter sent by rpc
// it will notify all wait threads
void signal();
Expand Down Expand Up @@ -355,6 +356,7 @@ class IRuntimeFilter {
int64_t registration_time() const { return registration_time_; }

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
std::string formatted_state() const;

protected:
// serialize _wrapper to protobuf
Expand All @@ -373,8 +375,6 @@ class IRuntimeFilter {

void _set_push_down(bool push_down) { _is_push_down = push_down; }

std::string _format_status() const;

std::string _get_explain_state_string() const {
if (_enable_pipeline_exec) {
return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY
Expand Down
27 changes: 21 additions & 6 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
}
return _acquire_runtime_filter();
return _acquire_runtime_filter(false);
}

bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
Expand Down Expand Up @@ -129,10 +129,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};
}

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand All @@ -145,12 +142,30 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
}
_wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependency.get());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(),
p.get_name() + "_FILTER_DEPENDENCY");
return Status::OK();
}

Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}

SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
int64_t rf_time = 0;
for (auto& dep : _filter_dependencies) {
rf_time += dep->watcher_elapse_time();
}
COUNTER_SET(_wait_for_rf_timer, rf_time);

return Base::close(state);
}

Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block, bool* eos) {
//auto& local_state = get_local_state(state);
Expand Down
19 changes: 16 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,29 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<Mul

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_acquire_runtime_filter(true));
return Status::OK();
}
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }
std::vector<Dependency*> filter_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
return res;
}

private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;

RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};

class MultiCastDataStreamerSourceOperatorX final
Expand Down
21 changes: 9 additions & 12 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const {
return; \
}

template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}

template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
return !_scanner_ctx->empty_in_queue(0);
Expand Down Expand Up @@ -133,7 +125,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependency.get());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(),
p.get_name() + "_FILTER_DEPENDENCY");

// 1: running at not pipeline mode will init profile.
// 2: the scan node should create scanner at pipeline mode will init profile.
Expand All @@ -156,7 +149,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_acquire_runtime_filter(true));
RETURN_IF_ERROR(_process_conjuncts());

auto status = _eos ? Status::OK() : _prepare_scanners();
Expand Down Expand Up @@ -1412,7 +1405,11 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
return Status::OK();
}
COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time());
COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time());
int64_t rf_time = 0;
for (auto& dep : _filter_dependencies) {
rf_time += dep->watcher_elapse_time();
}
COUNTER_UPDATE(exec_time_counter(), rf_time);
SCOPED_TIMER(_close_timer);

SCOPED_TIMER(exec_time_counter());
Expand All @@ -1421,7 +1418,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
}
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, rf_time);

return PipelineXLocalState<>::close(state);
}
Expand Down
17 changes: 14 additions & 3 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ class ScanOperatorX;
template <typename Derived>
class ScanLocalState : public ScanLocalStateBase {
ENABLE_FACTORY_CREATOR(ScanLocalState);
ScanLocalState(RuntimeState* state, OperatorXBase* parent);
ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& info) override;
Expand Down Expand Up @@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase {

int64_t get_push_down_count() override;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
std::vector<Dependency*> filter_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
return res;
}

std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }

Expand Down Expand Up @@ -364,7 +375,7 @@ class ScanLocalState : public ScanLocalStateBase {

std::mutex _block_lock;

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;

// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
Expand Down
112 changes: 38 additions & 74 deletions be/src/pipeline/pipeline_x/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
return ready ? nullptr : this;
}

Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
if (!_blocked_by_rf) {
return nullptr;
}
std::unique_lock<std::mutex> lc(_task_lock);
if (*_blocked_by_rf && !_is_cancelled()) {
if (LIKELY(task)) {
_add_block_task(task);
}
return this;
}
return nullptr;
}

std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
Expand All @@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {

std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}

bool RuntimeFilterTimer::has_ready() {
std::unique_lock<std::mutex> lc(_lock);
return _is_ready;
Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
}
return ready ? nullptr : this;
}

void RuntimeFilterTimer::call_timeout() {
std::unique_lock<std::mutex> lc(_lock);
if (_call_ready) {
return;
}
_call_timeout = true;
if (_parent) {
_parent->sub_filters(_filter_id);
}
_parent->set_ready();
}

void RuntimeFilterTimer::call_ready() {
std::unique_lock<std::mutex> lc(_lock);
if (_call_timeout) {
return;
}
_call_ready = true;
if (_parent) {
_parent->sub_filters(_filter_id);
}
_is_ready = true;
}

void RuntimeFilterTimer::call_has_ready() {
std::unique_lock<std::mutex> lc(_lock);
DCHECK(!_call_timeout);
if (!_call_ready) {
_parent->sub_filters(_filter_id);
}
_parent->set_ready();
}

void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
const auto filter_id = runtime_filter->filter_id();
;
_filters++;
_filter_ready_map[filter_id] = false;
int64_t registration_time = runtime_filter->registration_time();
int32 wait_time_ms = runtime_filter->wait_time_ms();
auto filter_timer = std::make_shared<RuntimeFilterTimer>(
filter_id, registration_time, wait_time_ms,
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
runtime_filter->set_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
void RuntimeFilterTimerQueue::start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);

void RuntimeFilterDependency::sub_filters(int id) {
std::vector<PipelineXTask*> local_block_task {};
{
std::lock_guard<std::mutex> lk(_task_lock);
if (!_filter_ready_map[id]) {
_filter_ready_map[id] = true;
_filters--;
while (_que.empty() && !_stop) {
cv.wait_for(lk, std::chrono::seconds(3), [this] { return !_que.empty() || _stop; });
}
if (_stop) {
break;
}
if (_filters == 0) {
_watcher.stop();
{
*_blocked_by_rf = false;
local_block_task.swap(_blocked_task);
{
std::unique_lock<std::mutex> lc(_que_lock);
std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
}
new_que.swap(_que);
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
for (auto* task : local_block_task) {
task->wake_up();
}
_shutdown = true;
}

void LocalExchangeSharedState::sub_running_sink_operators() {
Expand Down
Loading

0 comments on commit a8232c6

Please sign in to comment.