Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(graceful_exit): adjust the destructing order #843

Merged
merged 1 commit into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/tool-api/timer_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class timer_service
virtual ~timer_service() = default;

virtual void start() = 0;
virtual void stop() = 0;

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) = 0;
Expand Down
6 changes: 5 additions & 1 deletion src/aio/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ GTEST_API_ int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
dsn_run_config("config.ini", false);
return RUN_ALL_TESTS();
int g_test_ret = RUN_ALL_TESTS();
#ifndef ENABLE_GCOV
dsn_exit(g_test_ret);
#endif
return g_test_ret;
}
9 changes: 8 additions & 1 deletion src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,17 @@ meta_service::meta_service()
_access_controller = security::create_meta_access_controller();
}

meta_service::~meta_service()
meta_service::~meta_service() { stop(); }

void meta_service::stop()
{
zauto_write_lock l(_meta_lock);
if (!_started.load()) {
return;
}
_tracker.cancel_outstanding_tasks();
unregister_ctrl_commands();
_started = false;
}

bool meta_service::check_freeze() const
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class meta_service : public serverlet<meta_service>
virtual ~meta_service();

error_code start();
void stop();

const replication_options &get_options() const { return _opts; }
const meta_options &get_meta_options() const { return _meta_opts; }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_service_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ error_code meta_service_app::start(const std::vector<std::string> &args)

error_code meta_service_app::stop(bool /*cleanup*/)
{
_service.reset(nullptr);
_service->stop();
return ERR_OK;
}
} // namespace service
Expand Down
14 changes: 13 additions & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_max_concurrent_bulk_load_downloading_count(5),
_learn_app_concurrent_count(0),
_fs_manager(false),
_bulk_load_downloading_count(0)
_bulk_load_downloading_count(0),
_is_running(false)
{
#ifdef DSN_ENABLE_GPERF
_release_tcmalloc_memory_command = nullptr;
Expand Down Expand Up @@ -809,6 +810,10 @@ void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,

void replica_stub::initialize_start()
{
if (_is_running) {
return;
}

// start timer for configuration sync
if (!_options.config_sync_disabled) {
_config_sync_timer_task =
Expand Down Expand Up @@ -858,6 +863,8 @@ void replica_stub::initialize_start()
} else {
_state = NS_Connected;
}

_is_running = true;
}

dsn::error_code replica_stub::on_kill_replica(gpid id)
Expand Down Expand Up @@ -2475,6 +2482,10 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,

void replica_stub::close()
{
if (!_is_running) {
return;
}

_tracker.cancel_outstanding_tasks();

// this replica may not be opened
Expand Down Expand Up @@ -2579,6 +2590,7 @@ void replica_stub::close()
_replicas.erase(_replicas.begin());
}
}
_is_running = false;
}

std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new)
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// replica count exectuting bulk load downloading concurrently
std::atomic_int _bulk_load_downloading_count;

bool _is_running;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
Expand Down
1 change: 1 addition & 0 deletions src/runtime/rpc/rpc_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class rpc_engine
//
::dsn::error_code start(const service_app_spec &spec);
void start_serving() { _is_serving = true; }
void stop_serving() { _is_serving = false; }

//
// rpc registrations
Expand Down
36 changes: 20 additions & 16 deletions src/runtime/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "runtime/task/task_engine.h"
#include "runtime/rpc/rpc_engine.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/tool-api/env_provider.h>
Expand Down Expand Up @@ -165,7 +166,12 @@ rpc_request_task *service_node::generate_intercepted_request_task(message_ex *re
return t;
}

service_node::~service_node() = default;
service_node::~service_node()
{
_rpc->stop_serving();
stop_app(false);
_computation->stop();
}

//////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -188,6 +194,8 @@ service_engine::service_engine()

service_engine::~service_engine()
{
_nodes_by_app_id.clear();

UNREGISTER_VALID_HANDLER(_get_runtime_info_cmd);
UNREGISTER_VALID_HANDLER(_get_queue_info_cmd);
}
Expand Down Expand Up @@ -218,32 +226,28 @@ void service_engine::init_after_toollets()

void service_engine::start_node(service_app_spec &app_spec)
{
std::unordered_map<int, std::string> app_name_by_port;
auto it = _nodes_by_app_id.find(app_spec.id);
if (it == _nodes_by_app_id.end()) {
for (auto p : app_spec.ports) {
// union to existing node if any port is shared
if (_nodes_by_app_port.find(p) != _nodes_by_app_port.end()) {
service_node *n = _nodes_by_app_port[p];

dassert(false,
"network port %d usage confliction for %s vs %s, "
"please reconfig",
p,
n->full_name(),
app_spec.full_name.c_str());
auto it = app_name_by_port.find(p);
if (it != app_name_by_port.end()) {
dassert_f(false,
"network port {} usage confliction for {} vs {}, "
"please reconfig",
p,
it->second,
app_spec.full_name);
}
app_name_by_port.emplace(p, app_spec.full_name);
}

auto node = std::make_shared<service_node>(app_spec);
error_code err = node->start();
dassert(err == ERR_OK, "service node start failed, err = %s", err.to_string());
dassert_f(err == ERR_OK, "service node start failed, err = {}", err.to_string());

_nodes_by_app_id[node->id()] = node;
for (auto p1 : node->spec().ports) {
_nodes_by_app_port[p1] = node.get();
}

return;
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/runtime/service_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,8 @@ class service_engine : public utils::singleton<service_engine>

bool _simulator;

// <port, servicenode>
typedef std::map<int, service_node *>
node_engines_by_port; // multiple ports may share the same node
// map app_id to service_node
service_nodes_by_app_id _nodes_by_app_id;
node_engines_by_port _nodes_by_app_port;
};

// ------------ inline impl ---------------------
Expand Down
14 changes: 12 additions & 2 deletions src/runtime/task/simple_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ namespace dsn {
namespace tools {

simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider)
: timer_service(node, inner_provider)
: timer_service(node, inner_provider), _is_running(false)
{
}

void simple_timer_service::start()
{
if (_is_running) {
return;
}

_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);

Expand All @@ -53,12 +57,18 @@ void simple_timer_service::start()
false, "io_service in simple_timer_service run failed: %s", ec.message().data());
}
});
_is_running = true;
}

simple_timer_service::~simple_timer_service()
void simple_timer_service::stop()
{
if (!_is_running) {
return;
}

_ios.stop();
_worker.join();
_is_running = false;
}

void simple_timer_service::add_timer(task *task)
Expand Down
5 changes: 4 additions & 1 deletion src/runtime/task/simple_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ class simple_timer_service : public timer_service
public:
simple_timer_service(service_node *node, timer_service *inner_provider);

~simple_timer_service() override;
~simple_timer_service() override { stop(); }

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) override;

virtual void start() override;

virtual void stop() override;

private:
boost::asio::io_service _ios;
std::thread _worker;
bool _is_running;
};

} // namespace tools
Expand Down
57 changes: 46 additions & 11 deletions src/runtime/task/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* THE SOFTWARE.
*/

#include <dsn/dist/fmt_logging.h>

#include "task_engine.h"

using namespace dsn::utils;
Expand Down Expand Up @@ -78,23 +80,42 @@ void task_worker_pool::start()
if (_is_running)
return;

for (auto &tsvc : _per_queue_timer_svcs)
for (auto &tsvc : _per_queue_timer_svcs) {
tsvc->start();
for (auto &wk : _workers)
}
for (auto &wk : _workers) {
wk->start();
}

ddebug("[%s] thread pool [%s] started, pool_code = %s, worker_count = %d, worker_share_core = "
"%s, partitioned = %s, ...",
_node->full_name(),
_spec.name.c_str(),
_spec.pool_code.to_string(),
_spec.worker_count,
_spec.worker_share_core ? "true" : "false",
_spec.partitioned ? "true" : "false");
ddebug_f(
"[{}]: thread pool [{}] started, pool_code = {}, worker_count = {}, worker_share_core = "
"{}, partitioned = {}, ...",
_node->full_name(),
_spec.name,
_spec.pool_code.to_string(),
_spec.worker_count,
_spec.worker_share_core ? "true" : "false",
_spec.partitioned ? "true" : "false");

_is_running = true;
}

void task_worker_pool::stop()
{
if (!_is_running) {
return;
}

for (auto &tsvc : _per_queue_timer_svcs) {
tsvc->stop();
}
for (auto &wk : _workers) {
wk->stop();
}
_is_running = false;
ddebug_f("[{}]: thread pool {} stopped", _node->full_name(), _spec.name);
}

void task_worker_pool::add_timer(task *t)
{
dassert(t->delay_milliseconds() > 0,
Expand Down Expand Up @@ -213,8 +234,22 @@ void task_engine::start()
if (pl)
pl->start();
}

_is_running = true;
ddebug_f("[{}]: task engine started", _node->full_name());
}

void task_engine::stop()
{
if (!_is_running) {
return;
}

for (auto &pl : _pools) {
if (pl)
pl->stop();
}
_is_running = false;
ddebug_f("[{}]: task engine stopped", _node->full_name());
}

volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code, int hash)
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/task/task_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class task_worker_pool
// service management
void create();
void start();
void stop();

// task procecessing
void enqueue(task *task);
Expand Down Expand Up @@ -95,12 +96,14 @@ class task_engine
{
public:
task_engine(service_node *node);
~task_engine() { stop(); }

//
// service management routines
//
void create(const std::list<dsn::threadpool_code> &pools);
void start();
void stop();

//
// task management routines
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/task/task_engine.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class sim_timer_service : public timer_service
virtual void add_timer(task *task) override;

virtual void start() override {}

virtual void stop() override {}
};

class sim_task_queue : public task_queue
Expand Down