Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

valgrind: fix leaks of task_worker #181

Merged
merged 4 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/dsn/tool-api/task_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class task_worker : public extensible_object<task_worker, 4>
int _index;
int _native_tid;
std::string _name;
std::thread *_thread;
std::unique_ptr<std::thread> _thread;
bool _is_running;
utils::notify_event _started;
int _processed_task_count;
Expand Down
2 changes: 2 additions & 0 deletions include/dsn/tool-api/timer_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class timer_service
public:
timer_service(service_node *node, timer_service *inner_provider) { _node = node; }

virtual ~timer_service() = default;

virtual void start() = 0;

// after milliseconds, the provider should call task->enqueue()
Expand Down
5 changes: 1 addition & 4 deletions src/core/core/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ task_queue::task_queue(task_worker_pool *pool, int index, task_queue *inner_prov
_spec = (threadpool_spec *)&pool->spec();
}

task_queue::~task_queue()
{
perf_counters::instance().remove_counter(_queue_length_counter->full_name());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这段代码解释下,_queue_length_counter 本身是一个 perf_counter_wrapper,它析构时会调用

    perf_counters::instance().remove_counter(this->full_name());

所以执行顺序是

  1. task_queue 析构
  2. _queue_length_counter 被 removed
  3. _queue_length_counter 析构
  4. _queue_length_counter 又被 removed,这里是第二次 remove,valgrind 报错 invalid read.

}
task_queue::~task_queue() = default;

void task_queue::enqueue_internal(task *task)
{
Expand Down
158 changes: 20 additions & 138 deletions src/core/core/task_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,10 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <dsn/tool-api/task_worker.h>
#include "task_engine.h"
#include <sstream>
#include <errno.h>

#ifdef _WIN32

#else
#include <pthread.h>

#ifdef __FreeBSD__
#include <pthread_np.h>
#endif

#ifdef __APPLE__
#include <mach/thread_policy.h>
#endif
#include <dsn/utility/smart_pointers.h>

#endif
#include "task_engine.h"

namespace dsn {

Expand All @@ -77,7 +53,14 @@ task_worker::task_worker(task_worker_pool *pool,
_processed_task_count = 0;
}

task_worker::~task_worker() { stop(); }
task_worker::~task_worker()
{
if (!_is_running)
return;

// TODO(wutao1): use join, detach is not work with valgrind
_thread->detach();
}

void task_worker::start()
{
Expand All @@ -86,7 +69,7 @@ void task_worker::start()

_is_running = true;

_thread = new std::thread(std::bind(&task_worker::run_internal, this));
_thread = make_unique<std::thread>(std::bind(&task_worker::run_internal, this));

_started.wait();
}
Expand All @@ -99,116 +82,39 @@ void task_worker::stop()
_is_running = false;

_thread->join();
delete _thread;
_thread = nullptr;

_is_running = false;
}

void task_worker::set_name(const char *name)
{
#ifdef _WIN32

#ifndef MS_VC_EXCEPTION
#define MS_VC_EXCEPTION 0x406D1388
#endif

typedef struct tagTHREADNAME_INFO
{
uint32_t dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
uint32_t dwThreadID; // Thread ID (-1=caller thread).
uint32_t dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;

THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = name;
info.dwThreadID = (uint32_t)-1;
info.dwFlags = 0;

__try {
::RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(uint32_t), (ULONG_PTR *)&info);
} __except (EXCEPTION_CONTINUE_EXECUTION) {
}

#else
std::string sname(name);
auto thread_name = sname
#ifdef __linux__
.substr(0, (16 - 1))
#endif
;
auto thread_name = sname.substr(0, (16 - 1));
auto tid = pthread_self();
int err = 0;
#ifdef __FreeBSD__
pthread_set_name_np(tid, thread_name.c_str());
#elif defined(__linux__)
err = pthread_setname_np(tid, thread_name.c_str());
#elif defined(__APPLE__)
err = pthread_setname_np(thread_name.c_str());
#endif
int err = pthread_setname_np(tid, thread_name.c_str());
if (err != 0) {
dwarn("Fail to set pthread name. err = %d", err);
}
#endif
}

void task_worker::set_priority(worker_priority_t pri)
{
#ifndef _WIN32
#ifndef __linux__
static int policy = SCHED_OTHER;
#endif
static int prio_max =
#ifdef __linux__
-20;
#else
sched_get_priority_max(policy);
#endif
static int prio_min =
#ifdef __linux__
19;
#else
sched_get_priority_min(policy);
#endif
static int prio_max = -20;
static int prio_min = 19;
static int prio_middle = ((prio_min + prio_max + 1) / 2);
#endif

static int g_thread_priority_map[] = {
#ifdef _WIN32
THREAD_PRIORITY_LOWEST,
THREAD_PRIORITY_BELOW_NORMAL,
THREAD_PRIORITY_NORMAL,
THREAD_PRIORITY_ABOVE_NORMAL,
THREAD_PRIORITY_HIGHEST
#else
prio_min, (prio_min + prio_middle) / 2, prio_middle, (prio_middle + prio_max) / 2, prio_max
#endif
};
static int g_thread_priority_map[] = {prio_min,
(prio_min + prio_middle) / 2,
prio_middle,
(prio_middle + prio_max) / 2,
prio_max};

static_assert(ARRAYSIZE(g_thread_priority_map) == THREAD_xPRIORITY_COUNT,
"ARRAYSIZE(g_thread_priority_map) != THREAD_xPRIORITY_COUNT");

int prio = g_thread_priority_map[static_cast<int>(pri)];
bool succ = true;
#if !defined(_WIN32) && !defined(__linux__)
struct sched_param param;
memset(&param, 0, sizeof(struct sched_param));
param.sched_priority = prio;
#endif

#ifdef _WIN32
succ = (::SetThreadPriority(::GetCurrentThread(), prio) == TRUE);
#elif defined(__linux__)
if ((nice(prio) == -1) && (errno != 0)) {
succ = false;
}
#else
succ = (pthread_setschedparam(pthread_self(), policy, &param) == 0);
//# error "not implemented"
#endif

if (!succ) {
dwarn("You may need priviledge to set thread priority. errno = %d", errno);
}
Expand All @@ -226,23 +132,6 @@ void task_worker::set_affinity(uint64_t affinity)
}

int err = 0;
#ifdef _WIN32
if (::SetThreadAffinityMask(::GetCurrentThread(), static_cast<DWORD_PTR>(affinity)) == 0) {
err = static_cast<int>(::GetLastError());
}
#elif defined(__APPLE__)
thread_affinity_policy_data_t policy;
policy.affinity_tag = static_cast<integer_t>(affinity);
err = static_cast<int>(thread_policy_set(static_cast<thread_t>(::dsn::utils::get_current_tid()),
THREAD_AFFINITY_POLICY,
(thread_policy_t)&policy,
THREAD_AFFINITY_POLICY_COUNT));
#else
#ifdef __FreeBSD__
#ifndef cpu_set_t
#define cpu_set_t cpuset_t
#endif
#endif
cpu_set_t cpuset;
int nr_bits = std::min(nr_cpu, static_cast<int>(sizeof(affinity) * 8));

Expand All @@ -253,7 +142,6 @@ void task_worker::set_affinity(uint64_t affinity)
}
}
err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
#endif

if (err != 0) {
dwarn("Fail to set thread affinity. err = %d", err);
Expand Down Expand Up @@ -307,7 +195,6 @@ void task_worker::loop()
task_queue *q = queue();
int best_batch_size = pool_spec().dequeue_batch_size;

// try {
while (_is_running) {
int batch_size = best_batch_size;
task *task = q->dequeue(batch_size), *next;
Expand Down Expand Up @@ -336,11 +223,6 @@ void task_worker::loop()

_processed_task_count += batch_size;
}
/*}
catch (std::exception& ex)
{
dassert (false, "%s: unhandled exception '%s'", name().c_str(), ex.what());
}*/
}

const threadpool_spec &task_worker::pool_spec() const { return pool()->spec(); }
Expand Down
32 changes: 17 additions & 15 deletions src/core/tools/common/simple_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "simple_task_queue.h"

namespace dsn {
namespace tools {

simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider)
: timer_service(node, inner_provider)
{
_worker = nullptr;
}

void simple_timer_service::start()
{
_worker = std::shared_ptr<std::thread>(new std::thread([this]() {
_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);

char buffer[128];
Expand All @@ -55,8 +46,18 @@ void simple_timer_service::start()
task_worker::set_priority(worker_priority_t::THREAD_xPRIORITY_ABOVE_NORMAL);

boost::asio::io_service::work work(_ios);
_ios.run();
}));
boost::system::error_code ec;
_ios.run(ec);
if (ec) {
dassert(false, "io_service in simple_timer_service run failed: %s", ec.message().data());
}
});
}

simple_timer_service::~simple_timer_service()
{
_ios.stop();
_worker.join();
}

void simple_timer_service::add_timer(task *task)
Expand Down Expand Up @@ -93,5 +94,6 @@ task *simple_task_queue::dequeue(/*inout*/ int &batch_size)
batch_size = 1;
return t;
}
}
}

} // namespace tools
} // namespace dsn
20 changes: 8 additions & 12 deletions src/core/tools/common/simple_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool_api.h>
Expand All @@ -46,6 +37,8 @@ class simple_task_queue : public task_queue
public:
simple_task_queue(task_worker_pool *pool, int index, task_queue *inner_provider);

~simple_task_queue() override = default;

virtual void enqueue(task *task) override;
virtual task *dequeue(/*inout*/ int &batch_size) override;

Expand All @@ -59,14 +52,17 @@ class simple_timer_service : public timer_service
public:
simple_timer_service(service_node *node, timer_service *inner_provider);

~simple_timer_service() override;

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) override;

virtual void start() override;

private:
boost::asio::io_service _ios;
std::shared_ptr<std::thread> _worker;
std::thread _worker;
};
}
}

} // namespace tools
} // namespace dsn