Skip to content

Commit

Permalink
Adding general metrics for tasktree nodes!
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Feb 1, 2023
1 parent 907db72 commit 90532db
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 1 deletion.
79 changes: 79 additions & 0 deletions src/apex/apex_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#endif

#include "apex_api.hpp"
#include "memory_wrapper.hpp"
#include "apex_error_handling.hpp"
#if defined(APEX_HAVE_MPI) || \
(defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_MPI))
Expand Down Expand Up @@ -161,6 +162,31 @@ void _symbol( MPI_Fint *ierr ) { \
apex::sample_value(name, bytes);
return bytes;
}
inline double getBytesTransferred2(const int count, MPI_Datatype datatype, MPI_Comm comm, const char * function) {
int typesize = 0;
int commsize = 0;
PMPI_Type_size( datatype, &typesize );
PMPI_Comm_size( comm, &commsize );
double bytes = (double)(typesize) * (double)(count) * (double)commsize;
std::string name("Bytes : ");
name.append(function);
apex::sample_value(name, bytes);
return bytes;
}
inline double getBytesTransferred3(const int * count, MPI_Datatype datatype, MPI_Comm comm, const char * function) {
int typesize = 0;
int commsize = 0;
PMPI_Type_size( datatype, &typesize );
PMPI_Comm_size( comm, &commsize );
double bytes = 0;
for(int i = 0 ; i < commsize ; i++) {
bytes += ((double)(typesize) * (double)(count[i]));
}
std::string name("Bytes : ");
name.append(function);
apex::sample_value(name, bytes);
return bytes;
}
inline void getBandwidth(double bytes, std::shared_ptr<apex::task_wrapper> task, const char * function) {
if ((task != nullptr) && (task->prof != nullptr)) {
std::string name("BW (Bytes/second) : ");
Expand All @@ -176,6 +202,7 @@ void _symbol( MPI_Fint *ierr ) { \
double bytes = getBytesTransferred(count, datatype, "MPI_Isend");
/* start the timer */
MPI_START_TIMER
apex::recordMetric("Send Bytes", bytes);
/* sample the bytes */
int retval = PMPI_Isend(buf, count, datatype, dest, tag, comm, request);
MPI_STOP_TIMER
Expand All @@ -202,6 +229,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * des
/* Get the byte count */
double bytes = getBytesTransferred(count, datatype, "MPI_Irecv");
MPI_START_TIMER
apex::recordMetric("Recv Bytes", bytes);
int retval = PMPI_Irecv(buf, count, datatype, source, tag, comm,
request);
MPI_STOP_TIMER
Expand Down Expand Up @@ -229,6 +257,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * sou
double bytes = getBytesTransferred(count, datatype, "MPI_Send");
/* start the timer */
MPI_START_TIMER
apex::recordMetric("Send Bytes", bytes);
/* sample the bytes */
int retval = PMPI_Send(buf, count, datatype, dest, tag, comm);
MPI_STOP_TIMER
Expand All @@ -253,6 +282,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * des
/* Get the byte count */
double bytes = getBytesTransferred(count, datatype, "MPI_Recv");
MPI_START_TIMER
apex::recordMetric("Recv Bytes", bytes);
int retval = PMPI_Recv(buf, count, datatype, source, tag, comm, status);
MPI_STOP_TIMER
/* record the bandwidth */
Expand Down Expand Up @@ -285,7 +315,12 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * sou
}
int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Gather sendbuf");
double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Gather recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Gather(sendbuf, sendcount, sendtype, recvbuf,
recvcount, recvtype, root, comm);
Expand All @@ -311,7 +346,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcnt, MPI_Fint *sendtype, void * recvb

int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(count, datatype, "MPI_Allreduce sendbuf");
double rbytes = getBytesTransferred2(count, datatype, comm, "MPI_Allreduce recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm);
MPI_STOP_TIMER
Expand All @@ -335,7 +375,12 @@ void _symbol(void * sendbuf, void * recvbuf, MPI_Fint *count, MPI_Fint *datatype

int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(count, datatype, "MPI_Reduce sendbuf");
double rbytes = getBytesTransferred2(count, datatype, comm, "MPI_Reduce recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm);
MPI_STOP_TIMER
Expand All @@ -360,7 +405,16 @@ void _symbol(void * sendbuf, void * recvbuf, MPI_Fint *count, MPI_Fint *datatype

int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root,
MPI_Comm comm ) {
//int commrank;
//PMPI_Comm_rank(comm, &commrank);
/* Get the byte count */
double sbytes = getBytesTransferred(count, datatype, "MPI_Bcast");
MPI_START_TIMER
//if (root == commrank) {
apex::recordMetric("Send Bytes", sbytes);
//} else {
//apex::recordMetric("Recv Bytes", sbytes);
//}
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Bcast(buffer, count, datatype, root, comm );
MPI_STOP_TIMER
Expand Down Expand Up @@ -409,7 +463,12 @@ void _symbol(MPI_Fint *count, MPI_Fint * array_of_requests, MPI_Fint *ierr) { \

int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Alltoall sendbuf");
double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Alltoall recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Alltoall(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
MPI_STOP_TIMER
Expand All @@ -434,7 +493,12 @@ MPI_Fint *recvcnt, MPI_Fint *recvtype, MPI_Fint *comm, MPI_Fint *ierr) { \

int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Allgather sendbuf");
double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Allgather recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
MPI_STOP_TIMER
Expand All @@ -461,7 +525,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcount, MPI_Fint *sendtype, void * rec
int MPI_Allgatherv(const void* buffer_send, int count_send, MPI_Datatype datatype_send,
void* buffer_recv, const int* counts_recv, const int* displacements,
MPI_Datatype datatype_recv, MPI_Comm communicator) {
/* Get the byte count */
double sbytes = getBytesTransferred(count_send, datatype_send, "MPI_Allgatherv sendbuf");
double rbytes = getBytesTransferred3(counts_recv, datatype_recv, communicator, "MPI_Allgatherv recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(communicator, __APEX_FUNCTION__, p);
int retval = PMPI_Allgatherv(buffer_send, count_send, datatype_send,
buffer_recv, counts_recv, displacements, datatype_recv, communicator);
Expand All @@ -488,7 +557,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcount, MPI_Fint *sendtype, void * rec
int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, const int *recvcounts, const int *displs,
MPI_Datatype recvtype, int root, MPI_Comm comm) {
/* Get the byte count */
double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Gatherv sendbuf");
double rbytes = getBytesTransferred3(recvcounts, recvtype, comm, "MPI_Gatherv recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Gatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm);
MPI_STOP_TIMER
Expand All @@ -514,7 +588,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcnt, MPI_Fint *sendtype, void * recvb
int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype,
int source, int recvtag, MPI_Comm comm, MPI_Status * status) {
/* Get the byte count */
double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Sendrecv sendbuf");
double rbytes = getBytesTransferred(recvcount, recvtype, "MPI_Sendrecv recvbuf");
MPI_START_TIMER
apex::recordMetric("Send Bytes", sbytes);
apex::recordMetric("Recv Bytes", rbytes);
apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p);
int retval = PMPI_Sendrecv(sendbuf, sendcount, sendtype, dest, sendtag, recvbuf, recvcount, recvtype,
source, recvtag, comm, status);
Expand Down
28 changes: 28 additions & 0 deletions src/apex/dependency_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace dependency {
// declare an instance of the statics
std::mutex Node::treeMutex;
std::atomic<size_t> Node::nodeCount{0};
std::set<std::string> Node::known_metrics;

Node* Node::appendChild(task_identifier* c) {
treeMutex.lock();
Expand Down Expand Up @@ -356,6 +357,14 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id)
double variance = std::max(0.0,((getSumSquares() / ncalls) - (mean * mean)));
double stddev = sqrt(variance);
outfile << stddev;
// write any available metrics
for (auto& x : known_metrics) {
if (metric_map.find(x) == metric_map.end()) {
outfile << ",0";
} else {
outfile << "," << metric_map[x];
}
}
// end the line
outfile << std::endl;

Expand All @@ -377,6 +386,25 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id)
return acc;
}

void Node::addMetrics(std::map<std::string, double>& _metric_map) {
static std::mutex m;
for (auto& x: _metric_map) {
std::cout << x.first << " => " << x.second << '\n';
if (known_metrics.find(x.first) == known_metrics.end()) {
m.lock();
known_metrics.insert(x.first);
m.unlock();
}
m.lock();
if (metric_map.find(x.first) == metric_map.end()) {
metric_map[x.first] = x.second;
} else {
metric_map[x.first] += x.second;
}
m.unlock();
}
}

} // dependency_tree

} // apex
8 changes: 8 additions & 0 deletions src/apex/dependency_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <fstream>
#include <atomic>
#include <set>
#include <map>
#include "apex_types.h"
#include "task_identifier.hpp"

Expand All @@ -35,8 +36,11 @@ class Node {
size_t index;
std::set<uint64_t> thread_ids;
std::unordered_map<task_identifier, Node*> children;
// map for arbitrary metrics
std::map<std::string, double> metric_map;
static std::mutex treeMutex;
static std::atomic<size_t> nodeCount;
static std::set<std::string> known_metrics;
public:
Node(task_identifier* id, Node* p) :
data(id), parent(p), count(1), inclusive(0),
Expand Down Expand Up @@ -75,6 +79,10 @@ class Node {
static size_t getNodeCount() {
return nodeCount;
}
void addMetrics(std::map<std::string, double>& metric_map);
static std::set<std::string>& getKnownMetrics() {
return known_metrics;
}
};

} // dependency_tree
Expand Down
8 changes: 8 additions & 0 deletions src/apex/memory_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ void recordFree(void* ptr, bool cpu) {
if (cpu) sample_value("Memory: Total Bytes Occupied", value);
}

/* This doesn't belong here, but whatevs */
void recordMetric(std::string name, double value) {
profiler * p = thread_instance::instance().get_current_profiler();
if (p != nullptr) {
p->metric_map[name] = value;
}
}

// Comparator function to sort pairs descending, according to second value
bool cmp(std::pair<void*, record_t>& a,
std::pair<void*, record_t>& b)
Expand Down
1 change: 1 addition & 0 deletions src/apex/memory_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ book_t& getBook(void);
void printBacktrace(void);
void recordAlloc(size_t bytes, void* ptr, allocator_t alloc, bool cpu = true);
void recordFree(void* ptr, bool cpu = false);
void recordMetric(std::string name, double value);

}; // apex namespace

2 changes: 2 additions & 0 deletions src/apex/profiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class profiler;
#include <sstream>
#include <math.h>
#include <memory>
#include <map>
#include "apex_options.hpp"
#include "apex_types.h"
#include "apex_assert.h"
Expand Down Expand Up @@ -61,6 +62,7 @@ class profiler {
bool stopped;
// needed for correct Hatchet output
uint64_t thread_id;
std::map<std::string, double> metric_map;
task_identifier * get_task_id(void) {
return task_id;
}
Expand Down
7 changes: 6 additions & 1 deletion src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ std::unordered_set<profile*> free_profiles;
}
if (apex_options::use_tasktree_output() && !p.is_counter && p.tt_ptr != nullptr) {
p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.inclusive_seconds(), p.is_resume, p.thread_id);
p.tt_ptr->tree_node->addMetrics(p.metric_map);
}
return 1;
}
Expand Down Expand Up @@ -1176,7 +1177,11 @@ std::unordered_set<profile*> free_profiles;
tree_stream << "\"process rank\",\"node index\",\"parent index\",\"depth\",";
tree_stream << "\"name\",\"calls\",\"threads\",\"accumulated\",";
tree_stream << "\"minimum\",\"mean\",\"maximum\",";
tree_stream << "\"sumsqr\"\n";
tree_stream << "\"sumsqr\"";
for (auto& x : dependency::Node::getKnownMetrics()) {
tree_stream << ",\"" << x << "\"";
}
tree_stream << "\n";
}
root->tree_node->writeNodeCSV(tree_stream, wall_clock_main, node_id);
std::string filename{"apex_tasktree.csv"};
Expand Down

0 comments on commit 90532db

Please sign in to comment.