Skip to content

Commit

Permalink
Working MPI clock sync for OTF2.
Browse files Browse the repository at this point in the history
Still need to test and fully implment the HPX version.
In this implementation, all ranks determine a latency between rank 0
and themselves, then compute a clock drift between the two.  That
drift is added to the archive file creation time as an absolute
reference that all ranks can use.
  • Loading branch information
khuck committed Jul 23, 2021
1 parent c01f1e6 commit 0cd203c
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/apex/CMakeLists.hpx
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ include(APEX_SetupMSR)
include(APEX_SetupOTF2)
if(APEX_WITH_OTF2)
set(otf2_headers otf2_listener.hpp)
set(otf2_sources otf2_listener.cpp)
set(otf2_sources otf2_listener.cpp otf2_listener_hpx.cpp)
endif()

if(HPX_WITH_HPXMP)
Expand Down
2 changes: 1 addition & 1 deletion src/apex/CMakeLists.standalone
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ SET(SENSOR_SOURCE sensor_data.cpp)
endif(LM_SENSORS_FOUND)

if (OTF2_FOUND)
SET(OTF2_SOURCE otf2_listener.cpp)
SET(OTF2_SOURCE otf2_listener.cpp otf2_listener_mpi.cpp otf2_listener_nompi.cpp)
endif(OTF2_FOUND)

if (CUPTI_FOUND)
Expand Down
18 changes: 12 additions & 6 deletions src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,6 @@ namespace apex {
otf2_listener::otf2_listener (void) : _terminate(false),
_initialized(false), comm_evt_writer(nullptr),
global_def_writer(nullptr), dropped(0) {
/* get a start time for the trace */
globalOffset = get_time();
/* set the flusher */
flush_callbacks.otf2_pre_flush = otf2_listener::pre_flush;
flush_callbacks.otf2_post_flush = otf2_listener::post_flush;
Expand Down Expand Up @@ -528,7 +526,7 @@ namespace apex {
stringstream tmp;
tmp << "APEX version " << version();
OTF2_EC(OTF2_Archive_SetCreator(archive, tmp.str().c_str()));
#if defined(APEX_HAVE_MPI_disabled)
#if defined(APEX_HAVE_MPI)
OTF2_MPI_Archive_SetCollectiveCallbacks(archive,
MPI_COMM_WORLD, MPI_COMM_NULL);
#else
Expand All @@ -549,11 +547,11 @@ namespace apex {
get_string_index(empty);
// save the node id, because the apex object my not be
// around when we are finalizing everything.
my_saved_node_id = apex::instance()->get_node_id();
my_saved_node_count = apex::instance()->get_num_ranks();
/*
my_saved_node_id = getCommRank();
my_saved_node_count = getCommSize();
cout << "Rank " << my_saved_node_id << " of "
<< my_saved_node_count << "." << endl;
/*
*/
// now is a good time to make sure the archive is open on this
// rank/locality
Expand All @@ -565,6 +563,14 @@ namespace apex {
std::cerr << "Archive not created!" << std::endl; fflush(stderr);
return;
}
// synchronize global time offset based on archive creation time
struct stat stat_buf;
if(stat(apex_options::otf2_archive_path(), &stat_buf) == 0) {
/* get a start time for the trace, relative to when
* the archive was created. */
globalOffset = (stat_buf.st_mtim.tv_sec * 1000000000) +
stat_buf.st_mtim.tv_nsec + synchronizeClocks();
}
_initialized = true;
return;
}
Expand Down
3 changes: 3 additions & 0 deletions src/apex/otf2_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ namespace apex {
uint32_t make_vtid (async_thread_node &node);
std::map<uint32_t,uint64_t> last_ts;
uint64_t dropped;
int64_t synchronizeClocks(void);
int getCommRank(void);
int getCommSize(void);
public:
otf2_listener (void);
//~otf2_listener (void) { shutdown_event_data data(my_saved_node_id,0);
Expand Down
20 changes: 17 additions & 3 deletions src/apex/otf2_listener_hpx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ HPX_PLAIN_ACTION(apex::otf2::get_remote_timestamp,
HPX_PLAIN_ACTION(apex::otf2::null_action, apex_otf2_null_action);

namespace apex {
namespace otf2 {

int64_t getClockOffset(void) {
int64_t otf2_listener::getClockOffset(void) {
int64_t offset{0};
uint64_t offset_time{0};

Expand Down Expand Up @@ -67,7 +66,22 @@ int64_t getClockOffset(void) {
return offset;
}

} // namespace otf2
int otf2_listener::getCommRank() {
static int rank{-1};
if (rank == -1) {
rank = hpx::get_locality_id();
}
return rank;
}

int otf2_listener::getCommSize() {
static int size{-1};
if (size == -1) {
size = hpx::get_num_localities(hpx::launch::sync);
}
return size;
}

} // namespace apex

#endif
85 changes: 58 additions & 27 deletions src/apex/otf2_listener_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,92 @@
#include "mpi.h"

namespace apex {
namespace otf2 {

int64_t getClockOffset(void) {
int64_t otf2_listener::synchronizeClocks(void) {
int64_t offset{0};
uint64_t offset_time{0};
int rank{0};
int size{0};
int rank{getCommRank()};
int size{getCommSize()};
MPI_Status status;
const int attempts{10};

PMPI_Comm_rank(MPI_COMM_WORLD, &rank);
PMPI_Comm_size(MPI_COMM_WORLD, &size);
// synchronize all ranks
PMPI_Barrier(MPI_COMM_WORLD);

if (rank == 0) {
/* If rank 0, iterate over the other ranks and synchronize */
for (int index = 1 ; index < size ; index++) {
/* Measure how long it takes to send/receive with the worker */
uint64_t latency;
uint64_t before[attempts];
uint64_t after[attempts];
// take a timestamp
uint64_t before = apex::otf2_listener::get_time();
// send an empty message
PMPI_Send(NULL, 0, MPI_INT, index, 1, MPI_COMM_WORLD);
// receive an empty message
PMPI_Recv(NULL, 0, MPI_INT, index, 2, MPI_COMM_WORLD, &status);
// take a timestamp
uint64_t after = apex::otf2_listener::get_time();
for (int i = 0 ; i < attempts ; i++) {
before[i] = otf2_listener::get_time();
// send an empty message
PMPI_Send(NULL, 0, MPI_INT, index, 1, MPI_COMM_WORLD);
// receive an empty message
PMPI_Recv(NULL, 0, MPI_INT, index, 2, MPI_COMM_WORLD, &status);
// take a timestamp
after[i] = otf2_listener::get_time();
}
uint64_t latency = after[0] - before[0];
int my_min{0};
for (int i = 1 ; i < attempts ; i++) {
uint64_t next = after[i] - before[i];
if (next < latency) {
latency = next;
my_min = i;
}
}
// the latency is half of the elapsed time
latency = (after - before) / 2;
latency = (after[my_min] - before[my_min]) / 2;
/* Set the reference time stamp for this worker to the rank 0
"before" time plus the latency. That should match when
the worker took their time stamp, if the two are well synced. */
uint64_t ref_time = before + latency;
PMPI_Send(&ref_time, 1, MPI_UNSIGNED_LONG_LONG, index, 1, MPI_COMM_WORLD);
uint64_t ref_time[2];
ref_time[0] = after[my_min] - latency;
ref_time[1] = my_min;
PMPI_Send(ref_time, 2, MPI_UNSIGNED_LONG_LONG, index, 1, MPI_COMM_WORLD);
offset = 0;
printf("0->%d: Before: %lu After: %lu Latency: %lu\n", index, before[my_min], after[my_min], latency);
}
} else {
/* Measure how long it takes to send/receive with the main rank */
PMPI_Recv(NULL, 0, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);
// take a timestamp now!
uint64_t mytime = apex::otf2_listener::get_time();
PMPI_Send(NULL, 0, MPI_INT, 0, 2, MPI_COMM_WORLD);
uint64_t mytime[attempts];
for (int i = 0 ; i < attempts ; i++) {
/* Measure how long it takes to send/receive with the main rank */
PMPI_Recv(NULL, 0, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);
// take a timestamp now!
mytime[i] = otf2_listener::get_time();
PMPI_Send(NULL, 0, MPI_INT, 0, 2, MPI_COMM_WORLD);
}
// get the reference time from rank 0
uint64_t ref_time;
PMPI_Recv(NULL, &ref_time, MPI_UNSIGNED_LONG_LONG, 0, 1, MPI_COMM_WORLD, &status);
uint64_t ref_time[2];
PMPI_Recv(ref_time, 2, MPI_UNSIGNED_LONG_LONG, 0, 1, MPI_COMM_WORLD, &status);
// our offset is the reference time minus our timestamp between messages.
offset = ref_time - mytime;
offset = ref_time[0] - mytime[ref_time[1]];
printf(" %d: mytime: %lu reftime: %lu offset: %ld\n", rank, mytime[ref_time[1]], ref_time[0], offset);
}
// synchronize all ranks again
PMPI_Barrier(MPI_COMM_WORLD);

return offset;
}

} // namespace otf2
int otf2_listener::getCommRank() {
static int rank{-1};
if (rank == -1) {
PMPI_Comm_rank(MPI_COMM_WORLD, &rank);
}
return rank;
}

int otf2_listener::getCommSize() {
static int size{-1};
if (size == -1) {
PMPI_Comm_size(MPI_COMM_WORLD, &size);
}
return size;
}

} // namespace apex

#endif
34 changes: 34 additions & 0 deletions src/apex/otf2_listener_nompi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2014-2021 Kevin Huck
* Copyright (c) 2014-2021 University of Oregon
*
* Distributed under the Boost Software License, Version 1.0. (See accompanying
* file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/

/* This file contains MPI implementations of communication necessary to support
OTF2 tracing. For example, event unification and clock Synchronization.
*/

// only compile this file if we have NO networking support!
#if !defined(HPX_HAVE_NETWORKING) && !defined(APEX_HAVE_MPI)

#include "otf2_listener.hpp"

namespace apex {

int64_t otf2_listener::synchronizeClocks(void) {
return 0;
}

int otf2_listener::getCommRank() {
return apex::instance()->get_node_id();
}

int otf2_listener::getCommSize() {
return apex::instance()->get_num_ranks();
}

} // namespace apex

#endif
22 changes: 20 additions & 2 deletions src/apex/pthread_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class pthread_wrapper {
unsigned int _timeout_microseconds;
public:
std::atomic<bool> _running;
std::atomic<bool> _attached;
pthread_wrapper(void*(*func)(void*), void* context,
unsigned int timeout_microseconds) :
done(false),
_func(func),
_context_object(context),
_timeout_microseconds(timeout_microseconds),
_running(false) {
_running(false),
_attached(true) {
pthread_mutexattr_t Attr;
pthread_mutexattr_init(&Attr);
pthread_mutexattr_settype(&Attr, PTHREAD_MUTEX_ERRORCHECK);
Expand All @@ -62,6 +64,21 @@ class pthread_wrapper {
perror("Error: pthread_create (1) fails\n");
exit(1);
}
// be free, little thread!
ret = pthread_detach(worker_thread);
if (ret != 0) {
switch (ret) {
case ESRCH:
// no thread found?
case EINVAL:
// not joinable?
default:
errno = ret;
perror("Warning: pthread_detach failed\n");
return;
}
}
_attached = false;
};

void set_timeout(unsigned int timeout_microseconds) {
Expand All @@ -78,7 +95,7 @@ class pthread_wrapper {
// In some cases, the proc_read thread is already done
// and if we try to join it, we get a segv. So, check to
// see if it is still running before joining.
if (_running) {
if (_running && _attached) {
int ret = pthread_join(worker_thread, &retval);
if (ret != 0) {
switch (ret) {
Expand Down Expand Up @@ -141,6 +158,7 @@ class pthread_wrapper {
if (rc == ETIMEDOUT) {
return true;
} else if (rc == EINVAL) {
_running = false;
pthread_mutex_unlock(&_my_mutex);
return false;
} else if (rc == EPERM) {
Expand Down
3 changes: 1 addition & 2 deletions src/examples/MPITest/mpi_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ static int myrank = -1;
static int commsize = -1;

int main(int argc, char **argv) {
DEBUG_MSG;

/* Initialize MPI */

/*
Expand All @@ -38,6 +36,7 @@ int main(int argc, char **argv) {

MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &commsize);
DEBUG_MSG;
apex::init("MPI TEST", myrank, commsize);
apex::profiler* p = apex::start((apex_function_address)(main));
MPI_Barrier(MPI_COMM_WORLD);
Expand Down

0 comments on commit 0cd203c

Please sign in to comment.