From e2de5fc88dbd1730219c54af60faa0019b65e544 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 15 Mar 2023 18:08:57 +0800 Subject: [PATCH] Fix event loop thread might exit unexpectedly (#217) Fixes https://github.com/apache/pulsar-client-cpp/issues/209 ### Motivation When the event loop thread is started from `ExecutorService::restart`, there is a chance that `io_service_.run(ec)` returns immediately. In this case, the `ClientConnection::resolver_` that was registered in the IO service could block forever in `async_resolve` method. ### Modifications In the event loop thread, if the `io_service::run` method is not stopped by `ExecutorService::close`, just call `restart` and `run` again to avoid the event loop thread exits unexpectedly. Run the `ConnectionFailTest` independently with `--gtest_repeat=20` to avoid it's still flaky. --- .github/workflows/ci-pr-validation.yaml | 3 +++ lib/ExecutorService.cc | 29 ++++++++++--------------- run-unit-tests.sh | 2 -- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index af7dd8a5..1bb7cb38 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -96,6 +96,9 @@ jobs: - name: Start Pulsar service run: ./pulsar-test-service-start.sh + - name: Run ConnectionFailTest + run: ./tests/ConnectionFailTest --gtest_repeat=20 + - name: Run unit tests run: RETRY_FAILED=3 ./run-unit-tests.sh diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index 0c844636..a0dff0b6 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -30,22 +30,24 @@ ExecutorService::~ExecutorService() { close(0); } void ExecutorService::start() { auto self = shared_from_this(); - std::thread t{[self] { + std::thread t{[this, self] { LOG_DEBUG("Run io_service in a single thread"); boost::system::error_code ec; - IOService::work work_{self->getIOService()}; - self->getIOService().run(ec); + while (!closed_) { + io_service_.restart(); + IOService::work work{getIOService()}; + io_service_.run(ec); + } if (ec) { LOG_ERROR("Failed to run io_service: " << ec.message()); } else { LOG_DEBUG("Event loop of ExecutorService exits successfully"); } - { - std::lock_guard lock{self->mutex_}; - self->ioServiceDone_ = true; + std::lock_guard lock{mutex_}; + ioServiceDone_ = true; } - self->cond_.notify_all(); + cond_.notify_all(); }}; t.detach(); } @@ -75,7 +77,7 @@ SocketPtr ExecutorService::createSocket() { } TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) { - return std::shared_ptr >( + return std::shared_ptr>( new boost::asio::ssl::stream(*socket, ctx)); } @@ -103,16 +105,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { } } -void ExecutorService::restart() { - close(-1); // make sure it's closed - closed_ = false; - { - std::lock_guard lock{mutex_}; - ioServiceDone_ = false; - } - io_service_.restart(); - start(); -} +void ExecutorService::restart() { io_service_.stop(); } void ExecutorService::close(long timeoutMs) { bool expectedState = false; diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 5ff8fef7..a5489a73 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -25,8 +25,6 @@ cd $ROOT_DIR pushd tests -./ConnectionFailTest - export RETRY_FAILED="${RETRY_FAILED:-1}" if [ -f /gtest-parallel ]; then