diff --git a/src/apex/apex_mpi.cpp b/src/apex/apex_mpi.cpp index 13559471..aefd440c 100644 --- a/src/apex/apex_mpi.cpp +++ b/src/apex/apex_mpi.cpp @@ -16,6 +16,7 @@ #endif #include "apex_api.hpp" +#include "memory_wrapper.hpp" #include "apex_error_handling.hpp" #if defined(APEX_HAVE_MPI) || \ (defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_MPI)) @@ -161,6 +162,31 @@ void _symbol( MPI_Fint *ierr ) { \ apex::sample_value(name, bytes); return bytes; } + inline double getBytesTransferred2(const int count, MPI_Datatype datatype, MPI_Comm comm, const char * function) { + int typesize = 0; + int commsize = 0; + PMPI_Type_size( datatype, &typesize ); + PMPI_Comm_size( comm, &commsize ); + double bytes = (double)(typesize) * (double)(count) * (double)commsize; + std::string name("Bytes : "); + name.append(function); + apex::sample_value(name, bytes); + return bytes; + } + inline double getBytesTransferred3(const int * count, MPI_Datatype datatype, MPI_Comm comm, const char * function) { + int typesize = 0; + int commsize = 0; + PMPI_Type_size( datatype, &typesize ); + PMPI_Comm_size( comm, &commsize ); + double bytes = 0; + for(int i = 0 ; i < commsize ; i++) { + bytes += ((double)(typesize) * (double)(count[i])); + } + std::string name("Bytes : "); + name.append(function); + apex::sample_value(name, bytes); + return bytes; + } inline void getBandwidth(double bytes, std::shared_ptr task, const char * function) { if ((task != nullptr) && (task->prof != nullptr)) { std::string name("BW (Bytes/second) : "); @@ -176,6 +202,7 @@ void _symbol( MPI_Fint *ierr ) { \ double bytes = getBytesTransferred(count, datatype, "MPI_Isend"); /* start the timer */ MPI_START_TIMER + apex::recordMetric("Send Bytes", bytes); /* sample the bytes */ int retval = PMPI_Isend(buf, count, datatype, dest, tag, comm, request); MPI_STOP_TIMER @@ -202,6 +229,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * des /* Get the byte count */ double bytes = getBytesTransferred(count, datatype, "MPI_Irecv"); MPI_START_TIMER + apex::recordMetric("Recv Bytes", bytes); int retval = PMPI_Irecv(buf, count, datatype, source, tag, comm, request); MPI_STOP_TIMER @@ -229,6 +257,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * sou double bytes = getBytesTransferred(count, datatype, "MPI_Send"); /* start the timer */ MPI_START_TIMER + apex::recordMetric("Send Bytes", bytes); /* sample the bytes */ int retval = PMPI_Send(buf, count, datatype, dest, tag, comm); MPI_STOP_TIMER @@ -253,6 +282,7 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * des /* Get the byte count */ double bytes = getBytesTransferred(count, datatype, "MPI_Recv"); MPI_START_TIMER + apex::recordMetric("Recv Bytes", bytes); int retval = PMPI_Recv(buf, count, datatype, source, tag, comm, status); MPI_STOP_TIMER /* record the bandwidth */ @@ -285,7 +315,12 @@ void _symbol( void * buf, MPI_Fint * count, MPI_Fint * datatype, MPI_Fint * sou } int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Gather sendbuf"); + double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Gather recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm); @@ -311,7 +346,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcnt, MPI_Fint *sendtype, void * recvb int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(count, datatype, "MPI_Allreduce sendbuf"); + double rbytes = getBytesTransferred2(count, datatype, comm, "MPI_Allreduce recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm); MPI_STOP_TIMER @@ -335,7 +375,12 @@ void _symbol(void * sendbuf, void * recvbuf, MPI_Fint *count, MPI_Fint *datatype int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(count, datatype, "MPI_Reduce sendbuf"); + double rbytes = getBytesTransferred2(count, datatype, comm, "MPI_Reduce recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm); MPI_STOP_TIMER @@ -360,7 +405,16 @@ void _symbol(void * sendbuf, void * recvbuf, MPI_Fint *count, MPI_Fint *datatype int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm ) { + //int commrank; + //PMPI_Comm_rank(comm, &commrank); + /* Get the byte count */ + double sbytes = getBytesTransferred(count, datatype, "MPI_Bcast"); MPI_START_TIMER + //if (root == commrank) { + apex::recordMetric("Send Bytes", sbytes); + //} else { + //apex::recordMetric("Recv Bytes", sbytes); + //} apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Bcast(buffer, count, datatype, root, comm ); MPI_STOP_TIMER @@ -409,7 +463,12 @@ void _symbol(MPI_Fint *count, MPI_Fint * array_of_requests, MPI_Fint *ierr) { \ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Alltoall sendbuf"); + double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Alltoall recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Alltoall(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); MPI_STOP_TIMER @@ -434,7 +493,12 @@ MPI_Fint *recvcnt, MPI_Fint *recvtype, MPI_Fint *comm, MPI_Fint *ierr) { \ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Allgather sendbuf"); + double rbytes = getBytesTransferred2(recvcount, recvtype, comm, "MPI_Allgather recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); MPI_STOP_TIMER @@ -461,7 +525,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcount, MPI_Fint *sendtype, void * rec int MPI_Allgatherv(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, const int* counts_recv, const int* displacements, MPI_Datatype datatype_recv, MPI_Comm communicator) { + /* Get the byte count */ + double sbytes = getBytesTransferred(count_send, datatype_send, "MPI_Allgatherv sendbuf"); + double rbytes = getBytesTransferred3(counts_recv, datatype_recv, communicator, "MPI_Allgatherv recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(communicator, __APEX_FUNCTION__, p); int retval = PMPI_Allgatherv(buffer_send, count_send, datatype_send, buffer_recv, counts_recv, displacements, datatype_recv, communicator); @@ -488,7 +557,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcount, MPI_Fint *sendtype, void * rec int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int *recvcounts, const int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm) { + /* Get the byte count */ + double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Gatherv sendbuf"); + double rbytes = getBytesTransferred3(recvcounts, recvtype, comm, "MPI_Gatherv recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Gatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm); MPI_STOP_TIMER @@ -514,7 +588,12 @@ void _symbol(void * sendbuf, MPI_Fint *sendcnt, MPI_Fint *sendtype, void * recvb int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status * status) { + /* Get the byte count */ + double sbytes = getBytesTransferred(sendcount, sendtype, "MPI_Sendrecv sendbuf"); + double rbytes = getBytesTransferred(recvcount, recvtype, "MPI_Sendrecv recvbuf"); MPI_START_TIMER + apex::recordMetric("Send Bytes", sbytes); + apex::recordMetric("Recv Bytes", rbytes); apex_measure_mpi_sync(comm, __APEX_FUNCTION__, p); int retval = PMPI_Sendrecv(sendbuf, sendcount, sendtype, dest, sendtag, recvbuf, recvcount, recvtype, source, recvtag, comm, status); diff --git a/src/apex/dependency_tree.cpp b/src/apex/dependency_tree.cpp index 0f058c77..fcc4ea60 100644 --- a/src/apex/dependency_tree.cpp +++ b/src/apex/dependency_tree.cpp @@ -21,6 +21,7 @@ namespace dependency { // declare an instance of the statics std::mutex Node::treeMutex; std::atomic Node::nodeCount{0}; +std::set Node::known_metrics; Node* Node::appendChild(task_identifier* c) { treeMutex.lock(); @@ -356,6 +357,14 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id) double variance = std::max(0.0,((getSumSquares() / ncalls) - (mean * mean))); double stddev = sqrt(variance); outfile << stddev; + // write any available metrics + for (auto& x : known_metrics) { + if (metric_map.find(x) == metric_map.end()) { + outfile << ",0"; + } else { + outfile << "," << metric_map[x]; + } + } // end the line outfile << std::endl; @@ -377,6 +386,25 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id) return acc; } +void Node::addMetrics(std::map& _metric_map) { + static std::mutex m; + for (auto& x: _metric_map) { + std::cout << x.first << " => " << x.second << '\n'; + if (known_metrics.find(x.first) == known_metrics.end()) { + m.lock(); + known_metrics.insert(x.first); + m.unlock(); + } + m.lock(); + if (metric_map.find(x.first) == metric_map.end()) { + metric_map[x.first] = x.second; + } else { + metric_map[x.first] += x.second; + } + m.unlock(); + } +} + } // dependency_tree } // apex diff --git a/src/apex/dependency_tree.hpp b/src/apex/dependency_tree.hpp index 4ed6106e..ef9a1ee3 100644 --- a/src/apex/dependency_tree.hpp +++ b/src/apex/dependency_tree.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "apex_types.h" #include "task_identifier.hpp" @@ -35,8 +36,11 @@ class Node { size_t index; std::set thread_ids; std::unordered_map children; + // map for arbitrary metrics + std::map metric_map; static std::mutex treeMutex; static std::atomic nodeCount; + static std::set known_metrics; public: Node(task_identifier* id, Node* p) : data(id), parent(p), count(1), inclusive(0), @@ -75,6 +79,10 @@ class Node { static size_t getNodeCount() { return nodeCount; } + void addMetrics(std::map& metric_map); + static std::set& getKnownMetrics() { + return known_metrics; + } }; } // dependency_tree diff --git a/src/apex/memory_wrapper.cpp b/src/apex/memory_wrapper.cpp index 883b5844..297bd452 100644 --- a/src/apex/memory_wrapper.cpp +++ b/src/apex/memory_wrapper.cpp @@ -162,6 +162,14 @@ void recordFree(void* ptr, bool cpu) { if (cpu) sample_value("Memory: Total Bytes Occupied", value); } +/* This doesn't belong here, but whatevs */ +void recordMetric(std::string name, double value) { + profiler * p = thread_instance::instance().get_current_profiler(); + if (p != nullptr) { + p->metric_map[name] = value; + } +} + // Comparator function to sort pairs descending, according to second value bool cmp(std::pair& a, std::pair& b) diff --git a/src/apex/memory_wrapper.hpp b/src/apex/memory_wrapper.hpp index 6e88656b..0b0a6b77 100644 --- a/src/apex/memory_wrapper.hpp +++ b/src/apex/memory_wrapper.hpp @@ -63,6 +63,7 @@ book_t& getBook(void); void printBacktrace(void); void recordAlloc(size_t bytes, void* ptr, allocator_t alloc, bool cpu = true); void recordFree(void* ptr, bool cpu = false); +void recordMetric(std::string name, double value); }; // apex namespace diff --git a/src/apex/profiler.hpp b/src/apex/profiler.hpp index d4df0e1b..536e1b68 100644 --- a/src/apex/profiler.hpp +++ b/src/apex/profiler.hpp @@ -17,6 +17,7 @@ class profiler; #include #include #include +#include #include "apex_options.hpp" #include "apex_types.h" #include "apex_assert.h" @@ -61,6 +62,7 @@ class profiler { bool stopped; // needed for correct Hatchet output uint64_t thread_id; + std::map metric_map; task_identifier * get_task_id(void) { return task_id; } diff --git a/src/apex/profiler_listener.cpp b/src/apex/profiler_listener.cpp index 8a6dabf4..d0605cfb 100644 --- a/src/apex/profiler_listener.cpp +++ b/src/apex/profiler_listener.cpp @@ -425,6 +425,7 @@ std::unordered_set free_profiles; } if (apex_options::use_tasktree_output() && !p.is_counter && p.tt_ptr != nullptr) { p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.inclusive_seconds(), p.is_resume, p.thread_id); + p.tt_ptr->tree_node->addMetrics(p.metric_map); } return 1; } @@ -1176,7 +1177,11 @@ std::unordered_set free_profiles; tree_stream << "\"process rank\",\"node index\",\"parent index\",\"depth\","; tree_stream << "\"name\",\"calls\",\"threads\",\"accumulated\","; tree_stream << "\"minimum\",\"mean\",\"maximum\","; - tree_stream << "\"sumsqr\"\n"; + tree_stream << "\"sumsqr\""; + for (auto& x : dependency::Node::getKnownMetrics()) { + tree_stream << ",\"" << x << "\""; + } + tree_stream << "\n"; } root->tree_node->writeNodeCSV(tree_stream, wall_clock_main, node_id); std::string filename{"apex_tasktree.csv"};