Skip to content

Commit

Permalink
Merge pull request #651 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Reduce queue volatility by limiting proc queue size and scheduling pace
  • Loading branch information
lukemartinlogan authored Dec 27, 2023
2 parents 9603b15 + f1b0a18 commit 848e1f2
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 11 deletions.
3 changes: 2 additions & 1 deletion benchmark/hermes_api_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void GatherTimes(std::string test_name, size_t io_size, MpiTimer &t) {
if (t.rank_ == 0) {
double max = t.GetSec();
double mbps = io_size / t.GetUsec();
HIPRINT("{}: Time: {} sec, MBps (or MOps): {}, Count: {}, Nprocs: {}\n",
HILOG(kInfo, "{}: Time: {} sec, MBps (or MOps): {}, Count: {}, Nprocs: {}\n",
test_name, max, mbps, io_size, t.nprocs_);
}
}
Expand All @@ -41,6 +41,7 @@ void PutTest(int nprocs, int rank,
t.Resume();
for (int j = 0; j < repeat; ++j) {
for (size_t i = 0; i < blobs_per_rank; ++i) {
// HILOG(kInfo, "On blob {}", i)
size_t blob_name_int = rank * blobs_per_rank + i;
std::string name = std::to_string(blob_name_int);
bkt.AsyncPut(name, blob, ctx);
Expand Down
29 changes: 28 additions & 1 deletion hrun/include/hrun/queue_manager/queues/hshm_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,20 @@ struct MultiQueueT<Hshm> : public hipc::ShmContainer {

/** Emplace a SHM pointer to a task */
HSHM_ALWAYS_INLINE
bool Emplace(u32 prio, u32 lane_hash, hipc::Pointer &p, bool complete = false) {
bool Emplace(u32 prio, u32 lane_hash,
hipc::Pointer &p, bool complete = false) {
return Emplace(prio, lane_hash, LaneData(p, complete));
}

/**
* Emplace a SHM pointer to a task if the queue utilization is less than 50%
* */
HSHM_ALWAYS_INLINE
bool EmplaceFrac(u32 prio, u32 lane_hash,
hipc::Pointer &p, bool complete = false) {
return EmplaceFrac(prio, lane_hash, LaneData(p, complete));
}

/** Emplace a SHM pointer to a task */
bool Emplace(u32 prio, u32 lane_hash, const LaneData &data) {
if (IsEmplacePlugged()) {
Expand All @@ -255,6 +265,23 @@ struct MultiQueueT<Hshm> : public hipc::ShmContainer {
return !ret.IsNull();
}

/**
* Emplace a SHM pointer to a task if the queue utilization is less than 50%
* */
bool EmplaceFrac(u32 prio, u32 lane_hash, const LaneData &data) {
if (IsEmplacePlugged()) {
WaitForEmplacePlug();
}
LaneGroup &lane_group = GetGroup(prio);
u32 lane_id = lane_hash % lane_group.num_lanes_;
Lane &lane = GetLane(lane_group, lane_id);
if (lane.GetSize() * 2 > lane.GetDepth()) {
return false;
}
hshm::qtok_t ret = lane.emplace(data);
return !ret.IsNull();
}

/**
* Change the number of active lanes
* This assumes that PlugForResize and UnplugForResize are called externally.
Expand Down
21 changes: 19 additions & 2 deletions hrun/include/hrun/queue_manager/queues/mpsc_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class mpsc_queue : public ShmContainer {

// Check if there's space in the queue.
if (size > queue.size()) {
HILOG(kInfo, "Queue {}/{} is full, waiting for space", id_, queue_->size());
HILOG(kDebug, "Queue {}/{} is full, waiting for space",
id_, queue_->size());
while (true) {
head = head_.load();
size = tail - head + 1;
Expand All @@ -184,7 +185,7 @@ class mpsc_queue : public ShmContainer {
}
HERMES_THREAD_MODEL->Yield();
}
HILOG(kInfo, "Queue {}/{} got scheduled", id_, queue_->size());
HILOG(kDebug, "Queue {}/{} got scheduled", id_, queue_->size());
}

// Emplace into queue at our slot
Expand Down Expand Up @@ -284,6 +285,22 @@ class mpsc_queue : public ShmContainer {
return qtok_t::GetNull();
}
}

/** Current size of queue */
size_t GetSize() {
size_t tail = tail_.load();
size_t head = head_.load();
if (tail <= head) {
return 0;
} else {
return tail - head;
}
}

/** Max depth of queue */
size_t GetDepth() {
return (*queue_).size();
}
};

} // namespace hshm::ipc
Expand Down
7 changes: 5 additions & 2 deletions hrun/tasks_required/proc_queue/src/proc_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ class Server : public TaskLib {
task->is_fire_forget_ = true;
}
MultiQueue *real_queue = HRUN_CLIENT->GetQueue(QueueId(ptr->task_state_));
real_queue->Emplace(ptr->prio_, ptr->lane_hash_, task->sub_run_.shm_);
task->phase_ = PushTaskPhase::kWaitSchedule;
bool ret = real_queue->EmplaceFrac(
ptr->prio_, ptr->lane_hash_, task->sub_run_.shm_);
if (ret) {
task->phase_ = PushTaskPhase::kWaitSchedule;
}
}
case PushTaskPhase::kWaitSchedule: {
Task *&ptr = task->sub_run_.ptr_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,10 @@ class Client : public TaskLibClient {

/** Initialize automatic flushing */
void AsyncFlushDataConstruct(FlushDataTask *task,
const TaskNode &task_node) {
const TaskNode &task_node,
size_t period_ms) {
HRUN_CLIENT->ConstructTask<FlushDataTask>(
task, task_node, id_);
task, task_node, id_, period_ms);
}
HRUN_TASK_NODE_PUSH_ROOT(FlushData);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,8 @@ struct FlushDataTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_ALWAYS_INLINE explicit
FlushDataTask(hipc::Allocator *alloc,
const TaskNode &task_node,
const TaskStateId &state_id) : Task(alloc) {
const TaskStateId &state_id,
size_t period_ms) : Task(alloc) {
// Initialize task
task_node_ = task_node;
lane_hash_ = 0;
Expand All @@ -1171,7 +1172,7 @@ struct FlushDataTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
TASK_LONG_RUNNING |
TASK_COROUTINE |
TASK_REMOTE_DEBUG_MARK);
SetPeriodSec(5); // TODO(llogan): don't hardcode this
SetPeriodMs((double)period_ms);
domain_id_ = DomainId::GetLocal();
}

Expand Down
3 changes: 2 additions & 1 deletion tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class Server : public TaskLib {
bkt_mdm_.Init(task->bkt_mdm_, HRUN_ADMIN->queue_id_);
stager_mdm_.Init(task->stager_mdm_, HRUN_ADMIN->queue_id_);
op_mdm_.Init(task->op_mdm_, HRUN_ADMIN->queue_id_);
flush_task_ = blob_mdm_.AsyncFlushData(task->task_node_ + 1);
flush_task_ = blob_mdm_.AsyncFlushData(
task->task_node_ + 1, HERMES_SERVER_CONF.borg_.flush_period_);
}
task->SetModuleComplete();
}
Expand Down

0 comments on commit 848e1f2

Please sign in to comment.