Skip to content

Commit

Permalink
Fix event loop thread might exit unexpectedly (#217)
Browse files Browse the repository at this point in the history
Fixes #209

### Motivation

When the event loop thread is started from `ExecutorService::restart`,
there is a chance that `io_service_.run(ec)` returns immediately. In
this case, the `ClientConnection::resolver_` that was registered in the
IO service could block forever in `async_resolve` method.

### Modifications

In the event loop thread, if the `io_service::run` method is not stopped
by `ExecutorService::close`, just call `restart` and `run` again to
avoid the event loop thread exits unexpectedly.

Run the `ConnectionFailTest` independently with `--gtest_repeat=20` to
avoid it's still flaky.
  • Loading branch information
BewareMyPower authored Mar 15, 2023
1 parent 2108d54 commit e2de5fc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ jobs:
- name: Start Pulsar service
run: ./pulsar-test-service-start.sh

- name: Run ConnectionFailTest
run: ./tests/ConnectionFailTest --gtest_repeat=20

- name: Run unit tests
run: RETRY_FAILED=3 ./run-unit-tests.sh

Expand Down
29 changes: 11 additions & 18 deletions lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@ ExecutorService::~ExecutorService() { close(0); }

void ExecutorService::start() {
auto self = shared_from_this();
std::thread t{[self] {
std::thread t{[this, self] {
LOG_DEBUG("Run io_service in a single thread");
boost::system::error_code ec;
IOService::work work_{self->getIOService()};
self->getIOService().run(ec);
while (!closed_) {
io_service_.restart();
IOService::work work{getIOService()};
io_service_.run(ec);
}
if (ec) {
LOG_ERROR("Failed to run io_service: " << ec.message());
} else {
LOG_DEBUG("Event loop of ExecutorService exits successfully");
}

{
std::lock_guard<std::mutex> lock{self->mutex_};
self->ioServiceDone_ = true;
std::lock_guard<std::mutex> lock{mutex_};
ioServiceDone_ = true;
}
self->cond_.notify_all();
cond_.notify_all();
}};
t.detach();
}
Expand Down Expand Up @@ -75,7 +77,7 @@ SocketPtr ExecutorService::createSocket() {
}

TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) {
return std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> >(
return std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>>(
new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(*socket, ctx));
}

Expand Down Expand Up @@ -103,16 +105,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
}
}

void ExecutorService::restart() {
close(-1); // make sure it's closed
closed_ = false;
{
std::lock_guard<std::mutex> lock{mutex_};
ioServiceDone_ = false;
}
io_service_.restart();
start();
}
void ExecutorService::restart() { io_service_.stop(); }

void ExecutorService::close(long timeoutMs) {
bool expectedState = false;
Expand Down
2 changes: 0 additions & 2 deletions run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ cd $ROOT_DIR

pushd tests

./ConnectionFailTest

export RETRY_FAILED="${RETRY_FAILED:-1}"

if [ -f /gtest-parallel ]; then
Expand Down

0 comments on commit e2de5fc

Please sign in to comment.