Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

valgrind: fix leaks of service_engine #183

Merged
merged 6 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/core/core/service_api_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static struct _all_info_
unsigned int magic;
bool engine_ready;
bool config_completed;
::dsn::tools::tool_app *tool;
std::unique_ptr<::dsn::tools::tool_app> tool;
::dsn::service_engine *engine;
std::vector<::dsn::task_spec *> task_specs;

Expand Down Expand Up @@ -207,7 +207,7 @@ DSN_API bool dsn_mimic_app(const char *app_role, int index)
for (auto &n : nodes) {
if (n.second->spec().role_name == std::string(app_role) &&
n.second->spec().index == index) {
::dsn::task::set_tls_dsn_context(n.second, nullptr);
::dsn::task::set_tls_dsn_context(n.second.get(), nullptr);
return true;
}
}
Expand Down Expand Up @@ -280,19 +280,30 @@ namespace tools {

bool is_engine_ready() { return dsn_all.is_engine_ready(); }

tool_app *get_current_tool() { return dsn_all.tool; }
tool_app *get_current_tool() { return dsn_all.tool.get(); }

} // namespace tools
} // namespace dsn

extern void dsn_log_init();
extern void dsn_core_init();

inline void dsn_global_init()
{
// ensure perf_counters is destructed after service_engine,
// because service_engine relies on the former to monitor
// task queues length.
dsn::perf_counters::instance();
dsn::service_engine::instance();
}

bool run(const char *config_file,
const char *config_arguments,
bool sleep_after_init,
std::string &app_list)
{
dsn_global_init();

s_runtime_init_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();
Expand Down Expand Up @@ -364,8 +375,8 @@ bool run(const char *config_file,
dsn::utils::filesystem::create_directory(spec.dir_log);

// init tools
dsn_all.tool = ::dsn::utils::factory_store<::dsn::tools::tool_app>::create(
spec.tool.c_str(), ::dsn::PROVIDER_TYPE_MAIN, spec.tool.c_str());
dsn_all.tool.reset(::dsn::utils::factory_store<::dsn::tools::tool_app>::create(
spec.tool.c_str(), ::dsn::PROVIDER_TYPE_MAIN, spec.tool.c_str()));
dsn_all.tool->install(spec);

// init app specs
Expand Down Expand Up @@ -509,7 +520,7 @@ void service_app::get_all_service_apps(std::vector<service_app *> *apps)
{
const service_nodes_by_app_id &nodes = dsn_all.engine->get_all_nodes();
for (const auto &kv : nodes) {
const service_node *node = kv.second;
const service_node *node = kv.second.get();
apps->push_back(const_cast<service_app *>(node->get_service_app()));
}
}
Expand Down
57 changes: 22 additions & 35 deletions src/core/core/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,24 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "service_engine.h"
#include "task_engine.h"
#include "disk_engine.h"
#include "rpc_engine.h"
#include <dsn/utility/factory_store.h>

#include <dsn/utility/filesystem.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/tool-api/uri_address.h>
#include <dsn/tool-api/env_provider.h>
#include <dsn/tool-api/command_manager.h>
#include <dsn/perf_counter/perf_counter.h>
#include <dsn/perf_counter/perf_counters.h>
#include <dsn/tool_api.h>
#include <dsn/tool/node_scoper.h>

using namespace dsn::utils;

namespace dsn {

service_node::service_node(service_app_spec &app_spec)
{
_computation = nullptr;
_app_spec = app_spec;
}
service_node::service_node(service_app_spec &app_spec) { _app_spec = app_spec; }

bool service_node::rpc_register_handler(task_code code,
const char *extra_name,
Expand All @@ -75,17 +61,17 @@ error_code service_node::init_io_engine()
error_code err = ERR_OK;

// init disk engine
_node_io.disk = new disk_engine(this);
_node_io.disk = make_unique<disk_engine>(this);
aio_provider *aio = factory_store<aio_provider>::create(
spec.aio_factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, _node_io.disk, nullptr);
spec.aio_factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, _node_io.disk.get(), nullptr);
for (auto it = spec.aio_aspects.begin(); it != spec.aio_aspects.end(); it++) {
aio = factory_store<aio_provider>::create(
it->c_str(), PROVIDER_TYPE_ASPECT, _node_io.disk, aio);
it->c_str(), PROVIDER_TYPE_ASPECT, _node_io.disk.get(), aio);
}
_node_io.aio = aio;
_node_io.aio.reset(aio);

// init rpc engine
_node_io.rpc = new rpc_engine(this);
_node_io.rpc = make_unique<rpc_engine>(this);

return err;
}
Expand All @@ -96,7 +82,7 @@ error_code service_node::start_io_engine_in_main()
error_code err = ERR_OK;

// start disk engine
_node_io.disk->start(_node_io.aio);
_node_io.disk->start(_node_io.aio.get());

// start rpc engine
err = _node_io.rpc->start(_app_spec);
Expand Down Expand Up @@ -149,7 +135,7 @@ error_code service_node::start()
dsn::utils::filesystem::create_directory(spec().data_dir);

// init task engine
_computation = new task_engine(this);
_computation = make_unique<task_engine>(this);
_computation->create(_app_spec.pools);
dassert(!_computation->is_started(), "task engine must not be started at this point");

Expand Down Expand Up @@ -208,12 +194,13 @@ rpc_request_task *service_node::generate_intercepted_request_task(message_ex *re
return t;
}

service_node::~service_node() = default;

//////////////////////////////////////////////////////////////////////////////////////////

service_engine::service_engine(void)
service_engine::service_engine()
{
_env = nullptr;
_logging = nullptr;

::dsn::command_manager::instance().register_command({"engine"},
"engine - get engine internal information",
Expand All @@ -226,13 +213,15 @@ service_engine::service_engine(void)
&service_engine::get_queue_info);
}

service_engine::~service_engine() = default;

void service_engine::init_before_toollets(const service_spec &spec)
{
_spec = spec;

// init common providers (first half)
_logging = factory_store<logging_provider>::create(
spec.logging_factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, spec.dir_log.c_str());
_logging.reset(factory_store<logging_provider>::create(
spec.logging_factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, spec.dir_log.c_str()));

// init common for all per-node providers
message_ex::s_local_hash =
Expand All @@ -257,12 +246,10 @@ void service_engine::init_after_toollets()
tls_dsn.env = _env;
}

service_node *service_engine::start_node(service_app_spec &app_spec)
void service_engine::start_node(service_app_spec &app_spec)
{
auto it = _nodes_by_app_id.find(app_spec.id);
if (it != _nodes_by_app_id.end()) {
return it->second;
} else {
if (it == _nodes_by_app_id.end()) {
for (auto p : app_spec.ports) {
// union to existing node if any port is shared
if (_nodes_by_app_port.find(p) != _nodes_by_app_port.end()) {
Expand All @@ -277,16 +264,16 @@ service_node *service_engine::start_node(service_app_spec &app_spec)
}
}

auto node = new service_node(app_spec);
auto node = std::make_shared<service_node>(app_spec);
error_code err = node->start();
dassert(err == ERR_OK, "service node start failed, err = %s", err.to_string());

_nodes_by_app_id[node->id()] = node;
for (auto p1 : node->spec().ports) {
_nodes_by_app_port[p1] = node;
_nodes_by_app_port[p1] = node.get();
}

return node;
return;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
28 changes: 15 additions & 13 deletions src/core/core/service_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ class service_node
public:
struct io_engine
{
rpc_engine *rpc;
disk_engine *disk;
aio_provider *aio;

io_engine() { memset((void *)this, 0, sizeof(io_engine)); }
std::unique_ptr<rpc_engine> rpc;
std::unique_ptr<disk_engine> disk;
std::unique_ptr<aio_provider> aio;
};

public:
explicit service_node(service_app_spec &app_spec);

rpc_engine *rpc() const { return _node_io.rpc; }
disk_engine *disk() const { return _node_io.disk; }
~service_node();

rpc_engine *rpc() const { return _node_io.rpc.get(); }
disk_engine *disk() const { return _node_io.disk.get(); }
task_engine *computation() const { return _computation.get(); }

task_engine *computation() const { return _computation; }
void get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss);
Expand All @@ -104,8 +104,8 @@ class service_node
std::unique_ptr<service_app> _entity;

service_app_spec _app_spec;
task_engine *_computation;

std::unique_ptr<task_engine> _computation;
io_engine _node_io;

private:
Expand All @@ -117,29 +117,31 @@ class service_node
error_code start_io_engine_in_main();
};

typedef std::map<int, service_node *> service_nodes_by_app_id;
typedef std::map<int, std::shared_ptr<service_node>> service_nodes_by_app_id;
class service_engine : public utils::singleton<service_engine>
{
public:
service_engine();

~service_engine();

// ServiceMode Mode() const { return _spec.Mode; }
const service_spec &spec() const { return _spec; }
env_provider *env() const { return _env; }
logging_provider *logging() const { return _logging; }
logging_provider *logging() const { return _logging.get(); }
static std::string get_runtime_info(const std::vector<std::string> &args);
static std::string get_queue_info(const std::vector<std::string> &args);

void init_before_toollets(const service_spec &spec);
void init_after_toollets();

service_node *start_node(service_app_spec &app_spec);
void start_node(service_app_spec &app_spec);
const service_nodes_by_app_id &get_all_nodes() const { return _nodes_by_app_id; }

private:
service_spec _spec;
env_provider *_env;
logging_provider *_logging;
std::unique_ptr<logging_provider> _logging;

// <port, servicenode>
typedef std::map<int, service_node *>
Expand Down
6 changes: 3 additions & 3 deletions src/core/core/tool_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void tool_app::start_all_apps()
{
auto apps = service_engine::instance().get_all_nodes();
for (auto &kv : apps) {
task *t = new service_control_task(kv.second, true);
t->set_delay(1000 * kv.second->spec().delay_seconds);
task *t = new service_control_task(kv.second.get(), true);
t->set_delay(1000 * kv.second.get()->spec().delay_seconds);
t->enqueue();
}
}
Expand All @@ -92,7 +92,7 @@ void tool_app::stop_all_apps(bool cleanup)
{
auto apps = service_engine::instance().get_all_nodes();
for (auto &kv : apps) {
task *t = new service_control_task(kv.second, false, cleanup);
task *t = new service_control_task(kv.second.get(), false, cleanup);
t->enqueue();
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/core/tools/common/native_aio_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk,

native_linux_aio_provider::~native_linux_aio_provider()
{
auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);

if (!_is_running) {
return;
}
_is_running = false;

auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);

_worker.join();
}

Expand Down