Skip to content

Commit

Permalink
[fix](pipeline) LocalExchangeSource was closing without opening (#39237
Browse files Browse the repository at this point in the history
…) (#39264)

## Proposed changes
#39237
which led to accessing uninitialized data. Moved the profile
initialization into the init method.
```
/mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:100:9: runtime error: member call on null pointer of type 'doris::RuntimeProfile::Counter'
    #0 0x563f0fb8b6c9 in doris::pipeline::Exchanger<std::shared_ptr<doris::pipeline::BlockWrapper>>::_dequeue_data(doris::pipeline::LocalExchangeSourceLocalState&, std::shared_ptr<doris::pipeline::BlockWrapper>&, bool*, doris::vectorized::Block*, int) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:100:9
    #1 0x563f0fb78a45 in doris::pipeline::LocalMergeSortExchanger::finalize(doris::pipeline::LocalExchangeSourceLocalState&) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:325:16
    #2 0x563f09610366 in doris::pipeline::LocalExchangeSharedState::sub_running_source_operators(doris::pipeline::LocalExchangeSourceLocalState&) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/dependency.cpp:196:20
    #3 0x563f0fb63709 in doris::pipeline::LocalExchangeSourceLocalState::close(doris::RuntimeState*) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp:59:24
    #4 0x563f09728e80 in doris::pipeline::OperatorXBase::close(doris::RuntimeState*) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/exec/operator.cpp:245:28
    #5 0x563f0fd3fb58 in doris::pipeline::PipelineTask::close(doris::Status) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/pipeline_task.cpp:459:28
    #6 0x563f0fdb5315 in doris::pipeline::_close_task(doris::pipeline::PipelineTask*, doris::Status) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:91:27
    #7 0x563f0fdb6573 in doris::pipeline::TaskScheduler::_do_work(unsigned long) /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:125:13
    #8 0x563f0fdb9d6a in doris::pipeline::TaskScheduler::start()::$_0::operator()() const /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:64:9
    #9 0x563f0fdb9cee in void std::__invoke_impl<void, doris::pipeline::TaskScheduler::start()::$_0&>(std::__invoke_other, doris::pipeline::TaskScheduler::start()::$_0&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61:14
    #10 0x563f0fdb9c4e in std::enable_if<is_invocable_r_v<void, doris::pipeline::TaskScheduler::start()::$_0&>, void>::type std::__invoke_r<void, doris::pipeline::TaskScheduler::start()::$_0&>(doris::pipeline::TaskScheduler::start()::$_0&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:111:2
    #11 0x563f0fdb99d5 in std::_Function_handler<void (), doris::pipeline::TaskScheduler::start()::$_0>::_M_invoke(std::_Any_data const&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290:9
    #12 0x563ebdddc8cf in std::function<void ()>::operator()() const /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591:9
    #13 0x563ec4bd6db4 in doris::FunctionRunnable::run() /mnt/disk2/yanxuecheng/doris/be/src/util/threadpool.cpp:48:27
    #14 0x563ec4bbc1b5 in doris::ThreadPool::dispatch_thread() /mnt/disk2/yanxuecheng/doris/be/src/util/threadpool.cpp:543:24
    #15 0x563ec4bf9a53 in void std::__invoke_impl<void, void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(std::__invoke_memfun_deref, void (doris::ThreadPool::*&)(), doris::ThreadPool*&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74:14
    #16 0x563ec4bf9858 in std::__invoke_result<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(void (doris::ThreadPool::*&)(), doris::ThreadPool*&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96:14
    #17 0x563ec4bf9790 in void std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>::__call<void, 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:506:11
    #18 0x563ec4bf9585 in void std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>::operator()<void>() /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:591:17
    #19 0x563ec4bf947e in void std::__invoke_impl<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>(std::__invoke_other, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61:14
    #20 0x563ec4bf93be in std::enable_if<is_invocable_r_v<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>, void>::type std::__invoke_r<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>(std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:111:2
    #21 0x563ec4bf8e55 in std::_Function_handler<void (), std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>>::_M_invoke(std::_Any_data const&) /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290:9
    #22 0x563ebdddc8cf in std::function<void ()>::operator()() const /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591:9
    #23 0x563ec4b78d91 in doris::Thread::supervise_thread(void*) /mnt/disk2/yanxuecheng/doris/be/src/util/thread.cpp:498:5
    #24 0x563ebdb2fe0a in asan_thread_start(void*) crtstuff.c
    #25 0x7feeac9e21c9 in start_thread (/lib64/libpthread.so.0+0x81c9) (BuildId: 823fccea3475e5870a4167dfe47df20e53222db0)
    #26 0x7feead3d1e72 in clone (/lib64/libc.so.6+0x39e72) (BuildId: ec3d7025354f1f1985831ff08ef0eb3b50aefbce)

SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:100:9 in 
*** Query id: ea174401bc134452-bd8a35522726a96a ***
*** is nereids: 1 ***
*** tablet id: 0 ***
*** Aborted at 1723455006 (unix time) try "date -d @1723455006" if you are using GNU date ***
*** Current BE git commitID: c47399c ***
*** SIGSEGV address not mapped to object (@0x0) received by PID 3055435 (TID 3060341 OR 0x7fe320acf700) from PID 0; stack trace: ***
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /mnt/disk2/yanxuecheng/doris/be/src/common/signal_handler.h:421
 1# 0x00007FEEAD3E6B50 in /lib64/libc.so.6
 2# doris::pipeline::Exchanger<std::shared_ptr<doris::pipeline::BlockWrapper> >::_dequeue_data(doris::pipeline::LocalExchangeSourceLocalState&, std::shared_ptr<doris::pipeline::BlockWrapper>&, bool*, doris::vectorized::Block*, int) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:100
 3# doris::pipeline::LocalMergeSortExchanger::finalize(doris::pipeline::LocalExchangeSourceLocalState&) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/local_exchange/local_exchanger.cpp:325
 4# doris::pipeline::LocalExchangeSharedState::sub_running_source_operators(doris::pipeline::LocalExchangeSourceLocalState&) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/dependency.cpp:196
 5# doris::pipeline::LocalExchangeSourceLocalState::close(doris::RuntimeState*) in /mnt/disk2/yanxuecheng/doris/output/be/lib/doris_be
 6# doris::pipeline::OperatorXBase::close(doris::RuntimeState*) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/exec/operator.cpp:245
 7# doris::pipeline::PipelineTask::close(doris::Status) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/pipeline_task.cpp:459
 8# doris::pipeline::_close_task(doris::pipeline::PipelineTask*, doris::Status) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:91
 9# doris::pipeline::TaskScheduler::_do_work(unsigned long) at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:125
10# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at /mnt/disk2/yanxuecheng/doris/be/src/pipeline/task_scheduler.cpp:64
11# void std::__invoke_impl<void, doris::pipeline::TaskScheduler::start()::$_0&>(std::__invoke_other, doris::pipeline::TaskScheduler::start()::$_0&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
12# std::enable_if<is_invocable_r_v<void, doris::pipeline::TaskScheduler::start()::$_0&>, void>::type std::__invoke_r<void, doris::pipeline::TaskScheduler::start()::$_0&>(doris::pipeline::TaskScheduler::start()::$_0&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
13# std::_Function_handler<void (), doris::pipeline::TaskScheduler::start()::$_0>::_M_invoke(std::_Any_data const&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
14# std::function<void ()>::operator()() const at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
15# doris::FunctionRunnable::run() at /mnt/disk2/yanxuecheng/doris/be/src/util/threadpool.cpp:48
16# doris::ThreadPool::dispatch_thread() at /mnt/disk2/yanxuecheng/doris/be/src/util/threadpool.cpp:543
17# void std::__invoke_impl<void, void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(std::__invoke_memfun_deref, void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74
18# std::__invoke_result<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96
19# void std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:506
20# void std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>::operator()<, void>() at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:591
21# void std::__invoke_impl<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::__invoke_other, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
22# std::enable_if<is_invocable_r_v<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>, void>::type std::__invoke_r<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
23# std::_Function_handler<void (), std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()> >::_M_invoke(std::_Any_data const&) at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
24# std::function<void ()>::operator()() const at /mnt/disk2/yanxuecheng/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
25# doris::Thread::supervise_thread(void*) at /mnt/disk2/yanxuecheng/doris/be/src/util/thread.cpp:498
26# asan_thread_start(void*) in /mnt/disk2/yanxuecheng/doris/output/be/lib/doris_be
27# start_thread in /lib64/libpthread.so.0
28# __clone in /lib64/libc.so.6

```

<!--Describe your changes.-->

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
Mryange authored Aug 13, 2024
1 parent ef2f469 commit 217e037
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(_init_timer);
_channel_id = info.task_idx;
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
return Status::OK();
}

Status LocalExchangeSourceLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));

_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch
: Base(state, parent) {}

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;

Expand Down

0 comments on commit 217e037

Please sign in to comment.