Skip to content

Commit

Permalink
More threaded statistic support
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Jul 8, 2022
1 parent 7c50ca2 commit 75e59ff
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/apex/apex_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ typedef struct _profile
size_t bytes_allocated; /*!< total bytes allocated in this task */
size_t bytes_freed; /*!< total bytes freed in this task */
int times_reset; /*!< How many times was this timer reset */
size_t num_threads; /*!< How many threads have seen this timer? */
} apex_profile;

/** Rather than use void pointers everywhere, be explicit about
Expand Down
7 changes: 5 additions & 2 deletions src/apex/dependency_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ double Node::writeNodeJSON(std::ofstream& outfile, double total, size_t indent)
if (data->get_tree_name().find("Synchronize") != std::string::npos) acc = 0.0;
double ncalls = (calls == 0) ? 1 : calls;
outfile << "\"metrics\": {\"time\": " << excl
<< ", \"time (inc)\": " << acc
<< ", \"total time (inc)\": " << acc
<< ", \"time (inc)\": " << (acc / (double)(thread_ids.size()))
<< ", \"threads\": " << thread_ids.size()
<< ", \"min (inc)\": " << min
<< ", \"max (inc)\": " << max
<< ", \"sumsqr (inc)\": " << sumsqr
Expand Down Expand Up @@ -291,14 +293,15 @@ void Node::writeTAUCallpath(std::ofstream& outfile, std::string prefix) {
return;
}

void Node::addAccumulated(double value, bool is_resume) {
void Node::addAccumulated(double value, bool is_resume, uint64_t thread_id) {
static std::mutex m;
m.lock();
if (!is_resume) { calls+=1; }
accumulated = accumulated + value;
if (min == 0.0 || value < min) { min = value; }
if (value > max) { max = value; }
sumsqr = sumsqr + (value*value);
thread_ids.insert(thread_id);
m.unlock();
}

Expand Down
4 changes: 3 additions & 1 deletion src/apex/dependency_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <iostream>
#include <fstream>
#include <atomic>
#include <set>
#include "task_identifier.hpp"

namespace apex {
Expand All @@ -29,6 +30,7 @@ class Node {
double max;
double sumsqr;
size_t index;
std::set<uint64_t> thread_ids;
std::unordered_map<task_identifier, Node*> children;
static std::mutex treeMutex;
static std::atomic<size_t> nodeCount;
Expand All @@ -52,7 +54,7 @@ class Node {
size_t getCount() { return count; }
size_t getCalls() { return calls; }
double getAccumulated() { return accumulated; }
void addAccumulated(double value, bool is_resume);
void addAccumulated(double value, bool is_resume, uint64_t thread_id);
size_t getIndex() { return index; };
std::string getName() { return data->get_name(); };
void writeNode(std::ofstream& outfile, double total);
Expand Down
34 changes: 25 additions & 9 deletions src/apex/profile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "apex_options.hpp"
#include "apex_types.h"
#include "string.h"
#include <set>

// Use this if you want the min, max and stddev.
#define FULL_STATISTICS
Expand All @@ -30,6 +31,7 @@ class profile {
* class will have its own mutex, controlling access to the data in
* _profile. Only needed when updating the values. */
std::mutex _mtx;
std::set<uint64_t> thread_ids;
public:
profile(double initial, int num_metrics, double * papi_metrics, bool
yielded = false, apex_profile_type type = APEX_TIMER) {
Expand Down Expand Up @@ -60,6 +62,7 @@ class profile {
_profile.frees = 0;
_profile.bytes_allocated = 0;
_profile.bytes_freed = 0;
_profile.num_threads = 1;
};
profile(double initial, int num_metrics, double * papi_metrics, bool
yielded, double allocations, double frees, double bytes_allocated,
Expand Down Expand Up @@ -90,14 +93,15 @@ class profile {
_profile.frees = frees;
_profile.bytes_allocated = bytes_allocated;
_profile.bytes_freed = bytes_freed;
_profile.num_threads = 1;
};
/* This constructor is so that we can create a dummy wrapper around profile
* data after we've done a reduction across ranks. */
profile(apex_profile * values) {
memcpy(&_profile, values, sizeof(apex_profile));
}
void increment(double increase, int num_metrics, double * papi_metrics,
bool yielded) {
bool yielded, uint64_t thread_id) {
_mtx.lock();
_profile.accumulated += increase;
_profile.stops = _profile.stops + 1.0;
Expand All @@ -118,17 +122,21 @@ class profile {
if (!yielded) {
_profile.calls = _profile.calls + 1.0;
}
thread_ids.insert(thread_id);
_profile.num_threads = thread_ids.size();
_mtx.unlock();
}
void increment(double increase, int num_metrics, double * papi_metrics,
double allocations, double frees, double bytes_allocated, double bytes_freed,
bool yielded) {
increment(increase, num_metrics, papi_metrics, yielded);
bool yielded, uint64_t thread_id) {
increment(increase, num_metrics, papi_metrics, yielded, thread_id);
_mtx.lock();
_profile.allocations += allocations;
_profile.frees += frees;
_profile.bytes_allocated += bytes_allocated;
_profile.bytes_freed += bytes_freed;
thread_ids.insert(thread_id);
_profile.num_threads = thread_ids.size();
_mtx.unlock();
}
void reset() {
Expand All @@ -140,10 +148,16 @@ class profile {
_profile.minimum = 0.0;
_profile.maximum = 0.0;
_profile.times_reset++;
_profile.num_threads = 1;
thread_ids.clear();
_mtx.unlock();
};
double get_calls() { return _profile.calls; }
double get_stops() { return _profile.stops; }
double get_calls() {
return _profile.calls;
}
double get_stops() {
return _profile.stops;
}
double get_mean() {
return (get_accumulated() / _profile.calls);
}
Expand All @@ -154,13 +168,16 @@ class profile {
return (get_accumulated_seconds() / _profile.calls);
}
double get_accumulated() {
return (_profile.accumulated);
return _profile.accumulated;
}
double get_accumulated_mean_threads() {
return (_profile.accumulated / (double)(_profile.num_threads));
}
double get_accumulated_useconds() {
return (_profile.accumulated * 1.0e-3);
return (get_accumulated() * 1.0e-3);
}
double get_accumulated_seconds() {
return (_profile.accumulated * 1.0e-9);
return (get_accumulated() * 1.0e-9);
}
double * get_papi_metrics() { return (_profile.papi_metrics); }
double get_minimum() {
Expand Down Expand Up @@ -188,6 +205,5 @@ class profile {

};


}

44 changes: 24 additions & 20 deletions src/apex/profile_reducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <limits>
#include <inttypes.h>

constexpr size_t num_fields{17};

#if !defined(HPX_HAVE_NETWORKING) && defined(APEX_HAVE_MPI)
#include "mpi.h"
#endif
Expand Down Expand Up @@ -137,8 +139,8 @@ std::map<std::string, apex_profile*> reduce_profiles() {
//sort(all_names.begin(), all_names.end());

// There are 8 "values" and 8 possible papi counters
sbuf_length = all_names.size() * 16;
rbuf_length = all_names.size() * 16 * commsize;
sbuf_length = all_names.size() * num_fields;
rbuf_length = all_names.size() * num_fields * commsize;
//DEBUG_PRINT("%d Sending %" PRIu64 " bytes\n", commrank, sbuf_length * sizeof(double));
double * s_pdata = (double*)calloc(sbuf_length, sizeof(double));
double * r_pdata = nullptr;
Expand All @@ -162,19 +164,20 @@ std::map<std::string, apex_profile*> reduce_profiles() {
dptr[5] = p->maximum;
dptr[6] = p->times_reset;
dptr[7] = (double)p->type;
dptr[8] = p->num_threads;
if (p->type == APEX_TIMER) {
dptr[8] = p->papi_metrics[0];
dptr[9] = p->papi_metrics[1];
dptr[10] = p->papi_metrics[2];
dptr[11] = p->papi_metrics[3];
dptr[12] = p->papi_metrics[4];
dptr[13] = p->papi_metrics[5];
dptr[14] = p->papi_metrics[6];
dptr[15] = p->papi_metrics[7];
dptr[9] = p->papi_metrics[0];
dptr[10] = p->papi_metrics[1];
dptr[11] = p->papi_metrics[2];
dptr[12] = p->papi_metrics[3];
dptr[13] = p->papi_metrics[4];
dptr[14] = p->papi_metrics[5];
dptr[15] = p->papi_metrics[6];
dptr[16] = p->papi_metrics[7];
}
}
}
dptr = &(dptr[16]);
dptr = &(dptr[num_fields]);
}

/* Reduce the data */
Expand Down Expand Up @@ -215,17 +218,18 @@ std::map<std::string, apex_profile*> reduce_profiles() {
p->minimum = dptr[4] < p->minimum ? dptr[4] : p->minimum;
p->maximum = dptr[5] > p->maximum ? dptr[5] : p->maximum;
p->times_reset += dptr[6];
p->num_threads = dptr[8] > p->num_threads ? dptr[8] : p->num_threads;
if (p->type == APEX_TIMER) {
p->papi_metrics[0] += dptr[8];
p->papi_metrics[1] += dptr[9];
p->papi_metrics[2] += dptr[10];
p->papi_metrics[3] += dptr[11];
p->papi_metrics[4] += dptr[12];
p->papi_metrics[5] += dptr[13];
p->papi_metrics[6] += dptr[14];
p->papi_metrics[7] += dptr[15];
p->papi_metrics[0] += dptr[9];
p->papi_metrics[1] += dptr[10];
p->papi_metrics[2] += dptr[11];
p->papi_metrics[3] += dptr[12];
p->papi_metrics[4] += dptr[13];
p->papi_metrics[5] += dptr[14];
p->papi_metrics[6] += dptr[15];
p->papi_metrics[7] += dptr[16];
}
dptr = &(dptr[16]);
dptr = &(dptr[17]);
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/apex/profiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class profiler {
bool is_resume; // for yield or resume
reset_type is_reset;
bool stopped;
// needed for correct Hatchet output
uint64_t thread_id;
task_identifier * get_task_id(void) {
return task_id;
}
Expand Down Expand Up @@ -103,7 +105,7 @@ class profiler {
guid(0),
is_counter(false),
is_resume(resume),
is_reset(reset), stopped(false) { };
is_reset(reset), stopped(false), thread_id(0) { };
// this constructor is for counters
profiler(task_identifier * id, double value_) :
task_id(id),
Expand Down Expand Up @@ -135,7 +137,8 @@ class profiler {
is_counter(in.is_counter),
is_resume(in.is_resume), // for yield or resume
is_reset(in.is_reset),
stopped(in.stopped)
stopped(in.stopped),
thread_id(in.thread_id)
{
//printf("COPY!\n"); fflush(stdout);
#if APEX_HAVE_PAPI
Expand Down
7 changes: 4 additions & 3 deletions src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ std::unordered_set<profile*> free_profiles;
if (apex_options::track_memory()) {
theprofile->increment(p.elapsed(), tmp_num_counters,
values, p.allocations, p.frees, p.bytes_allocated,
p.bytes_freed, p.is_resume);
p.bytes_freed, p.is_resume, p.thread_id);
} else {
theprofile->increment(p.elapsed(), tmp_num_counters,
values, p.is_resume);
values, p.is_resume, p.thread_id);
}
}
#if defined(APEX_THROTTLE)
Expand Down Expand Up @@ -419,7 +419,7 @@ std::unordered_set<profile*> free_profiles;
}
}
if (apex_options::use_tasktree_output() && !p.is_counter && p.tt_ptr != nullptr) {
p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.is_resume);
p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.is_resume, p.thread_id);
}
return 1;
}
Expand Down Expand Up @@ -1862,6 +1862,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
}
}
#endif
p->thread_id = _pls.my_tid;
#ifdef APEX_SYNCHRONOUS_PROCESSING
push_profiler(_pls.my_tid, *p);
#else // APEX_SYNCHRONOUS_PROCESSING
Expand Down

0 comments on commit 75e59ff

Please sign in to comment.