Skip to content

Commit

Permalink
Merge pull request #652 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Fix issue regarding RPC unscalability. Improved RAM management for task and data allocations.
  • Loading branch information
lukemartinlogan authored Dec 30, 2023
2 parents 848e1f2 + 6efc86f commit b897556
Show file tree
Hide file tree
Showing 30 changed files with 616 additions and 362 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option(BUILD_MPI_TESTS "Build tests which depend on MPI" ON)
option(BUILD_OpenMP_TESTS "Build tests which depend on OpenMP" ON)
option(HERMES_ENABLE_COVERAGE "Check how well tests cover code" OFF)
option(HERMES_ENABLE_DOXYGEN "Check how well the code is documented" OFF)
option(HERMES_REMOTE_DEBUG "Enable remote debug mode on hrun" OFF)

option(HERMES_ENABLE_POSIX_ADAPTER "Build the Hermes POSIX adapter." ON)
option(HERMES_ENABLE_STDIO_ADAPTER "Build the Hermes stdio adapter." OFF)
Expand Down
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,38 @@ docker commit hermes_deps_c lukemartinlogan/hermes_deps
docker push lukemartinlogan/hermes_deps
docker stop /hermes_deps_c
docker rm /hermes_deps_c
```

## Remote Debug

### On personal machine
```
[email protected]
local_port=4000
remote_port=4000
ssh -L ${local_port}:localhost:${remote_port} -fN ${ares_node}
local_port=4001
remote_port=4001
ssh -L ${local_port}:localhost:${remote_port} -fN ${ares_node}
```

On Ares
```
ares_node=ares-comp-27
local_port=4000
remote_port=4000
ssh -L ${local_port}:localhost:${remote_port} -fN ${ares_node}
local_port=4001
remote_port=4001
ssh -L ${local_port}:localhost:${remote_port} -fN ${ares_node}
```

Find PIDs of processes using ports
```
lsof -i :4000
lsof -i :4001
```
2 changes: 2 additions & 0 deletions hrun/config/hrun_server_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ queue_manager:
shm_size: 0g
# The size of the shared memory to allocate for data buffers
data_shm_size: 4g
# The size of the shared memory to allocate for runtime data buffers
rdata_shm_size: 4g

### Define properties of RPCs
rpc:
Expand Down
69 changes: 63 additions & 6 deletions hrun/include/hrun/api/hrun_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Client : public ConfigurationManager {
}
main_alloc_ = mem_mngr->GetAllocator(main_alloc_id_);
data_alloc_ = mem_mngr->GetAllocator(data_alloc_id_);
rdata_alloc_ = mem_mngr->GetAllocator(rdata_alloc_id_);
header_ = main_alloc_->GetCustomHeader<HrunShm>();
unique_ = &header_->unique_;
node_id_ = header_->node_id_;
Expand Down Expand Up @@ -239,30 +240,86 @@ class Client : public ConfigurationManager {
yield_task->Yield<THREAD_MODEL>();
}

/** Allocate a buffer in a task */
/** Allocate a buffer */
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBufferClient(size_t size) {
return AllocateBufferSafe<TASK_YIELD_STD>(data_alloc_, size);
}

/** Allocate a buffer */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBufferServer(size_t size) {
return AllocateBufferSafe<THREAD_MODEL>(rdata_alloc_, size);
}

/** Allocate a buffer */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBufferServer(size_t size, Task *yield_task) {
return AllocateBufferSafe<THREAD_MODEL>(rdata_alloc_, size,
yield_task);
}

private:
/** Allocate a buffer */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBufferSafe(Allocator *alloc, size_t size) {
HILOG(kDebug, "Heap size for {}/{}: {}",
alloc->GetId().bits_.major_,
alloc->GetId().bits_.minor_,
alloc->GetCurrentlyAllocatedSize());
LPointer<char> p;
p = main_alloc_->AllocateLocalPtr<char>(size);
while (true) {
try {
p = alloc->AllocateLocalPtr<char>(size);
} catch (hshm::Error &e) {
p.shm_.SetNull();
}
if (!p.shm_.IsNull()) {
break;
}
// FlushRoot(DomainId::GetLocal());
Yield<THREAD_MODEL>();
HILOG(kDebug, "{} Waiting to allocate buffer of size {} (1)?", size);
}
return p;
}

/** Allocate a buffer */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBufferServer(size_t size) {
LPointer<char> AllocateBufferSafe(Allocator *alloc, size_t size,
Task *yield_task) {
HILOG(kDebug, "Heap size for {}/{}: {}",
alloc->GetId().bits_.major_,
alloc->GetId().bits_.minor_,
alloc->GetCurrentlyAllocatedSize());
LPointer<char> p;
p = main_alloc_->AllocateLocalPtr<char>(size);
while (true) {
try {
p = alloc->AllocateLocalPtr<char>(size);
} catch (hshm::Error &e) {
p.shm_.SetNull();
}
if (!p.shm_.IsNull()) {
break;
}
// FlushRoot(DomainId::GetLocal());
Yield<THREAD_MODEL>(yield_task);
HILOG(kDebug, "{} Waiting to allocate buffer of size {} (1)?", size);
}
return p;
}

public:
/** Free a buffer */
HSHM_ALWAYS_INLINE
void FreeBuffer(hipc::Pointer &p) {
auto alloc = HERMES_MEMORY_MANAGER->GetAllocator(p.allocator_id_);
alloc->Free(p);
HILOG(kDebug, "Heap size (1) for {}/{}: {}",
HILOG(kDebug, "Heap size for {}/{}: {}",
alloc->GetId().bits_.major_,
alloc->GetId().bits_.minor_,
alloc->GetCurrentlyAllocatedSize());
Expand All @@ -273,7 +330,7 @@ class Client : public ConfigurationManager {
void FreeBuffer(LPointer<char> &p) {
auto alloc = HERMES_MEMORY_MANAGER->GetAllocator(p.shm_.allocator_id_);
alloc->FreeLocalPtr(p);
HILOG(kDebug, "Heap size (2) for {}/{}: {}",
HILOG(kDebug, "Heap size for {}/{}: {}",
alloc->GetId().bits_.major_,
alloc->GetId().bits_.minor_,
alloc->GetCurrentlyAllocatedSize());
Expand Down
3 changes: 3 additions & 0 deletions hrun/include/hrun/api/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ class ConfigurationManager {
hipc::allocator_id_t(0, 1);
static inline const hipc::allocator_id_t data_alloc_id_ =
hipc::allocator_id_t(1, 1);
static inline const hipc::allocator_id_t rdata_alloc_id_ =
hipc::allocator_id_t(2, 1);
hipc::Allocator *main_alloc_;
hipc::Allocator *data_alloc_;
hipc::Allocator *rdata_alloc_;
bool is_being_initialized_;
bool is_initialized_;
bool is_terminated_;
Expand Down
6 changes: 5 additions & 1 deletion hrun/include/hrun/config/config_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ struct QueueManagerInfo {
size_t shm_size_;
/** Shared memory data region name */
std::string data_shm_name_;
/** Data shared memory region size */
/** Shared memory runtime data region name */
std::string rdata_shm_name_;
/** Client data shared memory region size */
size_t data_shm_size_;
/** Runtime data shared memory region size */
size_t rdata_shm_size_;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions hrun/include/hrun/config/config_server_default.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const inline char* kHrunServerDefaultConfigStr =
" shm_size: 0g\n"
" # The size of the shared memory to allocate for data buffers\n"
" data_shm_size: 4g\n"
" # The size of the shared memory to allocate for runtime data buffers\n"
" rdata_shm_size: 4g\n"
"\n"
"### Define properties of RPCs\n"
"rpc:\n"
Expand Down
2 changes: 1 addition & 1 deletion hrun/include/hrun/network/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ class ThalliumRpc {
std::string server_name = GetServerName(node_id);
tl::remote_procedure remote_proc = client_engine_->define(func_name);
tl::endpoint server = client_engine_->lookup(server_name);
HILOG(kDebug, "Found the server: {}={}", node_id, server_name)
if constexpr(!ASYNC) {
if constexpr (std::is_same<RetT, void>::value) {
remote_proc.disable_response();
Expand Down Expand Up @@ -195,6 +194,7 @@ class ThalliumRpc {
tl::bulk bulk = client_engine_->expose(segments, flag);
if constexpr (!ASYNC) {
if constexpr (std::is_same_v<RetT, void>) {
remote_proc.disable_response();
remote_proc.on(server)(bulk, std::forward<Args>(args)...);
} else {
return remote_proc.on(server)(bulk, std::forward<Args>(args)...);
Expand Down
17 changes: 17 additions & 0 deletions hrun/include/hrun/task_registry/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class TaskLib;
#define TASK_LANE_ALL BIT_OPT(u32, 19)
/** This task flushes the runtime */
#define TASK_FLUSH BIT_OPT(u32, 20)
/** This task is considered a root task */
#define TASK_IS_ROOT BIT_OPT(u32, 21)
/** This task is apart of remote debugging */
#define TASK_REMOTE_DEBUG_MARK BIT_OPT(u32, 31)

Expand Down Expand Up @@ -400,6 +402,21 @@ struct Task : public hipc::ShmContainer {
task_flags_.UnsetBits(TASK_LANE_ALL);
}

/** This task is a root task */
HSHM_ALWAYS_INLINE bool IsRoot() {
return task_flags_.Any(TASK_IS_ROOT);
}

/** Set this task as a root task */
HSHM_ALWAYS_INLINE void SetRoot() {
task_flags_.SetBits(TASK_IS_ROOT);
}

/** Unset this task a sa root task */
HSHM_ALWAYS_INLINE void UnsetRoot() {
task_flags_.UnsetBits(TASK_IS_ROOT);
}

/** Set period in nanoseconds */
HSHM_ALWAYS_INLINE void SetPeriodNs(double ns) {
period_ns_ = ns;
Expand Down
2 changes: 1 addition & 1 deletion hrun/include/hrun/task_registry/task_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class TaskRegistry {
Admin::CreateTaskStateTask *task) {
// Ensure state_id is not NULL
if (state_id.IsNull()) {
HILOG(kError, "The task state ID cannot be null");
HELOG(kError, "The task state ID cannot be null");
task->SetModuleComplete();
return nullptr;
}
Expand Down
Loading

0 comments on commit b897556

Please sign in to comment.