Skip to content

Commit

Permalink
Debugging missing stop/starts from threads that are spawned by blas, …
Browse files Browse the repository at this point in the history
…cuda, starpu, etc.
  • Loading branch information
khuck committed Jan 5, 2023
1 parent 4f66b60 commit a530b94
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 55 deletions.
65 changes: 44 additions & 21 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1344,9 +1344,15 @@ std::shared_ptr<task_wrapper> new_task(
{
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
}
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
}
const std::string apex_internal("apex_internal");
if (starts_with(name, apex_internal)) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
Expand All @@ -1371,13 +1377,19 @@ std::shared_ptr<task_wrapper> new_task(
const std::shared_ptr<task_wrapper> parent_task) {
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// get the Apex static instance
apex* instance = apex::instance();
// protect against calls after finalization
if (!instance || _exited) { return nullptr; }
if (!instance || _exited) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
task_identifier * id = task_identifier::get_task_id(function_address);
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parent_task, instance));
Expand All @@ -1389,9 +1401,13 @@ std::shared_ptr<task_wrapper> update_task(
const std::string &timer_name) {
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
if (wrapper == nullptr) {
// get the Apex static instance
apex* instance = apex::instance();
Expand Down Expand Up @@ -1429,9 +1445,13 @@ std::shared_ptr<task_wrapper> update_task(
const apex_function_address function_address) {
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
if (wrapper == nullptr) {
// get the Apex static instance
apex* instance = apex::instance();
Expand Down Expand Up @@ -1672,13 +1692,16 @@ void finalize()
FUNCTION_ENTER
// FIRST FIRST, check if we have orphaned threads...
//printf("\n\n*********** Thread count: %lu!\n\n\n", instance->known_threads.size());
if (!instance->known_threads.empty()) {
thread_instance& ti = thread_instance::instance();
//printf("\n\n*********** Orphaned children!\n\n\n");
//printf("I am thread: %p\n", &ti);
for (thread_instance* t : instance->known_threads) {
//printf("\tThread: %p\n", t);
t->clear_all_profilers();
{
read_lock_type l(instance->listener_mutex);
if (!instance->known_threads.empty()) {
thread_instance& ti = thread_instance::instance();
//printf("\n\n*********** Orphaned children!\n\n\n");
//printf("I am thread: %p\n", &ti);
for (thread_instance* t : instance->known_threads) {
//printf("\tThread: %p\n", t);
t->clear_all_profilers();
}
}
}
// FIRST, stop the top level timer, while the infrastructure is still
Expand Down Expand Up @@ -1852,9 +1875,10 @@ void register_thread(const std::string &name,
//printf("New thread: %p\n", &(*twp));
thread_instance::set_top_level_timer(twp);
}
static std::mutex _mutex;
unique_lock<mutex> l(_mutex);
instance->known_threads.insert(&ti);
{
write_lock_type l(instance->listener_mutex);
instance->known_threads.insert(&ti);
}
}

void exit_thread(void)
Expand All @@ -1869,9 +1893,8 @@ void exit_thread(void)
if (_exiting) return;
_exiting = true;
{
static std::mutex _mutex;
unique_lock<mutex> l(_mutex);
thread_instance& ti = thread_instance::instance(false);
write_lock_type l(instance->listener_mutex);
instance->known_threads.erase(&ti);
}
auto tmp = thread_instance::get_top_level_timer();
Expand Down
89 changes: 59 additions & 30 deletions src/apex/apex_starpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ extern "C" {
// printf("worker[%d]: w_total_executed = %"PRId64", w_cumul_execution_time = %lf\n", workerid, w_total_executed, w_cumul_execution_time);

std::stringstream ss1;
ss1 << "Worker " << workerid << " w_total_executed";
ss1 << " w_total_executed : Worker " << std::setfill('0') << std::setw(3) << workerid;
std::string tmp1{ss1.str()};
apex::sample_value( tmp1, w_total_executed );

std::stringstream ss2;
ss2 << "Worker " << workerid << " w_cumul_execution_time";
ss2 << " w_cumul_execution_time (us) : Worker " << std::setfill('0') << std::setw(3) << workerid;
std::string tmp2{ss2.str()};
apex::sample_value( tmp2, w_cumul_execution_time );
}
Expand All @@ -92,7 +92,6 @@ extern "C" {
*/

void enable_counters(starpu_prof_tool_info* prof_info, starpu_prof_tool_event_info* event_info, starpu_prof_tool_api_info* api_info ) {
printf("%s\n", __func__);
}

/* This one is called at the end of the initialization.
Expand Down Expand Up @@ -172,7 +171,7 @@ extern "C" {

/******************************************************************************/

void myfunction_cb( struct starpu_prof_tool_info* prof_info,
void initfunction_cb( struct starpu_prof_tool_info* prof_info,
union starpu_prof_tool_event_info* event_info,
struct starpu_prof_tool_api_info* api_info ) {

Expand Down Expand Up @@ -205,26 +204,12 @@ extern "C" {
break;
case starpu_prof_tool_event_driver_deinit:
case starpu_prof_tool_event_driver_init_end:
case starpu_prof_tool_event_end_cpu_exec:
case starpu_prof_tool_event_end_gpu_exec:
case starpu_prof_tool_event_end_transfer:
enter = false;
break;
case starpu_prof_tool_event_driver_init_start:
info << ": " << device_name.c_str(); // << ":" << prof_info->device_number << "}]";
event_name = event_name + info.str();
break;
case starpu_prof_tool_event_start_cpu_exec:
case starpu_prof_tool_event_start_gpu_exec:
info << ": " << device_name.c_str(); // << ":" << prof_info->device_number << "}]";
info << " : UNRESOLVED ADDR " << std::hex << prof_info->fun_ptr;
event_name = event_name + info.str();
break;
case starpu_prof_tool_event_start_transfer:
info << "[{ memnode " << prof_info->memnode << " }]";
event_name = event_name + info.str();
std::cout << "Transfer start " << event_name << std::endl;
break;
default:
std::cout << "Unknown callback " << prof_info->event_type << std::endl;
break;
Expand All @@ -246,7 +231,49 @@ extern "C" {
apex::stop(t);
my_stack.pop();
}
}
}

void myfunction_cb( struct starpu_prof_tool_info* prof_info,
union starpu_prof_tool_event_info* event_info,
struct starpu_prof_tool_api_info* api_info ) {

std::string event_name {event_types[prof_info->event_type]};
std::string device_name {device_types[prof_info->driver_type]};
std::stringstream info;

bool enter = true;
switch( prof_info->event_type ) {
case starpu_prof_tool_event_end_cpu_exec:
case starpu_prof_tool_event_end_gpu_exec:
break;
case starpu_prof_tool_event_start_cpu_exec:
case starpu_prof_tool_event_start_gpu_exec:
info << ": " << device_name.c_str(); // << ":" << prof_info->device_number << "}]";
info << " : UNRESOLVED ADDR " << std::hex << prof_info->fun_ptr;
event_name = event_name + info.str();
break;
default:
std::cout << "Unknown callback " << prof_info->event_type << std::endl;
break;
}

static thread_local std::stack<std::shared_ptr<apex::task_wrapper> > my_stack;
if (enter) {
auto t = apex::new_task(event_name);
apex::start(t);
my_stack.push(t);
} else {
if (my_stack.size() == 0) {
std::cerr << "APEX Timer stack is empty, bug in StarPU support! "
<< event_name
<< std::endl;
return;
}
auto t = my_stack.top();
apex::stop(t);
my_stack.pop();
}
}

void xferfunction_cb( struct starpu_prof_tool_info* prof_info,
union starpu_prof_tool_event_info* event_info,
Expand Down Expand Up @@ -294,7 +321,7 @@ void starpu_prof_tool_library_register(starpu_prof_tool_entry_register_func reg,
Otherwise the counters might not be ready yet */
//reg( starpu_prof_tool_event_init_end, &init_counters, info );
/* This one must be called at the end, but I don't know precisely when yet */
//reg( starpu_prof_tool_event_terminate, &finalize_counters, info );
reg( starpu_prof_tool_event_terminate, &finalize_counters, info );

device_types[starpu_prof_tool_driver_cpu] = "CPU";
device_types[starpu_prof_tool_driver_gpu] = "GPU";
Expand All @@ -304,8 +331,8 @@ void starpu_prof_tool_library_register(starpu_prof_tool_entry_register_func reg,
event_types[starpu_prof_tool_event_terminate] = "StarPU";
event_types[starpu_prof_tool_event_init_begin] = "StarPU init";
event_types[starpu_prof_tool_event_init_end] = "StarPU init";
event_types[starpu_prof_tool_event_driver_init] = "StarPU driver ";
event_types[starpu_prof_tool_event_driver_deinit] = "StarPU driver ";
event_types[starpu_prof_tool_event_driver_init] = "StarPU driver";
event_types[starpu_prof_tool_event_driver_deinit] = "StarPU driver";
event_types[starpu_prof_tool_event_driver_init_start] = "StarPU driver init ";
event_types[starpu_prof_tool_event_driver_init_end] = "StarPU driver init ";
event_types[starpu_prof_tool_event_start_cpu_exec] = "StarPU exec ";
Expand All @@ -317,18 +344,20 @@ void starpu_prof_tool_library_register(starpu_prof_tool_entry_register_func reg,
event_types[starpu_prof_tool_event_user_start] = "StarPU user event ";
event_types[starpu_prof_tool_event_user_end] = "StarPU user event ";

reg( starpu_prof_tool_event_init_begin, &myfunction_cb, info );
reg( starpu_prof_tool_event_init_end, &myfunction_cb, info );
reg( starpu_prof_tool_event_init, &myfunction_cb, info );
reg( starpu_prof_tool_event_terminate, &myfunction_cb, info );
reg( starpu_prof_tool_event_driver_init, &myfunction_cb, info );
reg( starpu_prof_tool_event_driver_deinit, &myfunction_cb, info );
reg( starpu_prof_tool_event_driver_init_start, &myfunction_cb, info );
reg( starpu_prof_tool_event_driver_init_end, &myfunction_cb, info );
reg( starpu_prof_tool_event_init_begin, &initfunction_cb, info );
reg( starpu_prof_tool_event_init_end, &initfunction_cb, info );
//reg( starpu_prof_tool_event_init, &initfunction_cb, info );
//reg( starpu_prof_tool_event_terminate, &initfunction_cb, info );
reg( starpu_prof_tool_event_driver_init, &initfunction_cb, info );
reg( starpu_prof_tool_event_driver_deinit, &initfunction_cb, info );
reg( starpu_prof_tool_event_driver_init_start, &initfunction_cb, info );
reg( starpu_prof_tool_event_driver_init_end, &initfunction_cb, info );

reg( starpu_prof_tool_event_start_cpu_exec, &myfunction_cb, info );
reg( starpu_prof_tool_event_end_cpu_exec, &myfunction_cb, info );
reg( starpu_prof_tool_event_start_gpu_exec, &myfunction_cb, info );
reg( starpu_prof_tool_event_end_gpu_exec, &myfunction_cb, info );

reg( starpu_prof_tool_event_start_transfer, &xferfunction_cb, info );
reg( starpu_prof_tool_event_end_transfer, &xferfunction_cb, info );
}
Expand Down
10 changes: 6 additions & 4 deletions src/wrappers/pthread_wrapper_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ DEFINE_CONSTRUCTOR(apex_init_static_void)
DEFINE_DESTRUCTOR(apex_finalize_static_void)

void apex_init_static_void() {
printf("Here! %s\n",__func__);
//printf("Here! %s\n",__func__);
apex::init("APEX Pthread Wrapper",0,1);
}
void apex_finalize_static_void() {
printf("There! %s\n",__func__);
//printf("There! %s\n",__func__);
apex::finalize();
}
#endif // HAS_CONSTRUCTORS
Expand Down Expand Up @@ -234,7 +234,8 @@ int apex_pthread_create_wrapper(pthread_create_p pthread_create_call,
// disable the memory wrapper
apex::in_apex prevent_problems;
std::shared_ptr<apex::task_wrapper> parent_task = apex::new_task("pthread_create");
apex::start(parent_task);
// can be null after finalize has started.
if (parent_task != nullptr) apex::start(parent_task);
// JUST ONCE, create the key
(void) pthread_once(&key_once, make_key);
// get the thread-local variable
Expand Down Expand Up @@ -266,7 +267,8 @@ int apex_pthread_create_wrapper(pthread_create_p pthread_create_call,
*/
wrapper->_wrapped = false;
}
apex::stop(parent_task);
// can be null after finalize has started.
if (parent_task != nullptr) apex::stop(parent_task);
return retval;
}

Expand Down

0 comments on commit a530b94

Please sign in to comment.