Skip to content

Commit

Permalink
#2201: temperedlb: sub-clustering implemented, disabled by default fo…
Browse files Browse the repository at this point in the history
…r now
  • Loading branch information
lifflander committed Dec 7, 2023
1 parent f15552b commit e35ca04
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 24 deletions.
244 changes: 221 additions & 23 deletions src/vt/vrt/collection/balance/temperedlb/temperedlb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ void TemperedLB::doLBStages(LoadType start_imb) {
max_load_over_iters_.clear();
is_overloaded_ = is_underloaded_ = false;
is_subclustering_ = false;
ready_to_satisfy_locks_ = false;

LoadType best_imb_this_trial = start_imb + 10;

Expand All @@ -681,6 +682,7 @@ void TemperedLB::doLBStages(LoadType start_imb) {
load_info_.clear();
is_overloaded_ = is_underloaded_ = false;
is_subclustering_ = false;
ready_to_satisfy_locks_ = false;
other_rank_clusters_.clear();

// Not clearing shared_block_size_ because this never changes and
Expand Down Expand Up @@ -1576,9 +1578,15 @@ void TemperedLB::originalTransfer() {

void TemperedLB::tryLock(NodeType requesting_node, double criterion_value) {
try_locks_.emplace(requesting_node, criterion_value);

if (ready_to_satisfy_locks_ and not is_locked_) {
satisfyLockRequest();
}
}

auto TemperedLB::removeClusterToSend(SharedIDType shared_id) {
auto TemperedLB::removeClusterToSend(
SharedIDType shared_id, std::set<ObjIDType> objs
) {
std::unordered_map<ObjIDType, LoadType> give_objs;
std::unordered_map<ObjIDType, SharedIDType> give_obj_shared_block;
std::unordered_map<SharedIDType, BytesType> give_shared_blocks_size;
Expand All @@ -1594,19 +1602,32 @@ auto TemperedLB::removeClusterToSend(SharedIDType shared_id) {
give_shared_blocks_size[shared_id] = shared_block_size_[shared_id];
}

for (auto const& [obj_id, obj_load] : cur_objs_) {
if (auto iter = obj_shared_block_.find(obj_id); iter != obj_shared_block_.end()) {
if (iter->second == shared_id) {
give_objs[obj_id] = obj_load;
give_obj_shared_block[obj_id] = shared_id;
if (
auto iter2 = give_obj_working_bytes.find(obj_id);
iter2 != give_obj_working_bytes.end()
) {
give_obj_working_bytes[obj_id] = iter2->second;
if (objs.size() == 0) {
for (auto const& [obj_id, obj_load] : cur_objs_) {
if (auto iter = obj_shared_block_.find(obj_id); iter != obj_shared_block_.end()) {
if (iter->second == shared_id) {
give_objs[obj_id] = obj_load;
give_obj_shared_block[obj_id] = shared_id;
if (
auto iter2 = give_obj_working_bytes.find(obj_id);
iter2 != give_obj_working_bytes.end()
) {
give_obj_working_bytes[obj_id] = iter2->second;
}
}
}
}
} else {
for (auto const& obj_id : objs) {
give_objs[obj_id] = cur_objs_.find(obj_id)->second;
give_obj_shared_block[obj_id] = shared_id;
if (
auto iter2 = give_obj_working_bytes.find(obj_id);
iter2 != give_obj_working_bytes.end()
) {
give_obj_working_bytes[obj_id] = iter2->second;
}
}
}

auto const blocks_here_before = getSharedBlocksHere();
Expand Down Expand Up @@ -1638,23 +1659,62 @@ auto TemperedLB::removeClusterToSend(SharedIDType shared_id) {
void TemperedLB::considerSubClustersAfterLock(MsgSharedPtr<LockedInfoMsg> msg) {
is_swapping_ = true;

// auto const& try_clusters = msg->locked_clusters;
// auto const& try_rank = msg->locked_node;
auto criterion = [&,this](auto src_cluster, auto try_cluster) -> double {
auto const& [src_id, src_bytes, src_load] = src_cluster;
auto const& [try_rank, try_total_load, try_total_bytes] = try_cluster;

auto const src_after_mem = current_memory_usage_;
auto const try_after_mem = try_total_bytes + src_bytes;

if (src_after_mem > mem_thresh_ or try_after_mem > mem_thresh_) {
return - std::numeric_limits<double>::infinity();
}

auto const before_work_src = this_new_load_;
auto const before_work_try = try_total_load;
auto const w_max_0 = std::max(before_work_src, before_work_try);

auto const after_work_src = this_new_load_ - src_load;
auto const after_work_try = before_work_try + src_load;
auto const w_max_new = std::max(after_work_src, after_work_try);

return w_max_0 - w_max_new;
};

auto const& try_clusters = msg->locked_clusters;
auto const& try_rank = msg->locked_node;
auto const& try_load = msg->locked_load;
auto const& try_total_bytes = msg->locked_bytes;

vt_print(
temperedlb,
"considerSubClustersAfterLock: try_rank={} try_load={}\n", try_rank, try_load
);

// get the shared blocks current residing on this rank
auto shared_blocks_here = getSharedBlocksHere();

// Shared IDs when added to this rank don't put it over the limit
std::set<SharedIDType> possible_transfers;

for (auto const& shared_id : shared_blocks_here) {
if (try_total_bytes + shared_block_size_[shared_id] < mem_thresh_) {
// Allow shared blocks that don't put it over memory or already exist on
// try_rank
if (try_clusters.find(shared_id) == try_clusters.end()) {
if (try_total_bytes + shared_block_size_[shared_id] < mem_thresh_) {
possible_transfers.insert(shared_id);
}
} else {
possible_transfers.insert(shared_id);
}
}

vt_print(
temperedlb,
"considerSubClustersAfterLock: possible_transfers={}\n",
possible_transfers.size()
);

// Now, we will greedily try to find a combo of objects that will reduce our
// max

Expand All @@ -1663,25 +1723,115 @@ void TemperedLB::considerSubClustersAfterLock(MsgSharedPtr<LockedInfoMsg> msg) {
auto const amount_over_average = this_new_load_ - target_max_load_;
auto const amount_under_average = target_max_load_ - try_load;

// Any sub-cluster that is smaller than amount_over_avergae or great than
// Any sub-cluster that is smaller than amount_over_average or smaller than
// amount_under_average we can just skip. We start by skipping all entire
// clusters that don't fit this criteria since sub-clusters will also be
// eliminated from those

vt_print(
temperedlb,
"considerSubClustersAfterLock: over={}, under={}\n", amount_over_average,
amount_under_average
);

std::set<SharedIDType> clusters_to_split;

for (auto const& [src_shared_id, src_cluster] : cur_clusters_) {
auto const& [src_cluster_bytes, src_cluster_load] = src_cluster;
if (
src_cluster_load < amount_over_average or
src_cluster_load > amount_under_average
src_cluster_load < amount_under_average
) {
// skip it
} else {
clusters_to_split.insert(src_shared_id);
}
}

double best_c_try = -1.0;
std::set<ObjIDType> best_selected;
SharedIDType best_id = -1;
for (auto const& shared_id : clusters_to_split) {
auto const& [src_cluster_bytes, src_cluster_load] = cur_clusters_[shared_id];

std::set<ObjLoad> objs;
for (auto const& [obj_id, shared_id_obj] : obj_shared_block_) {
if (shared_id_obj == shared_id) {
objs.emplace(obj_id, cur_objs_[obj_id]);
}
}

std::set<ObjIDType> selected;
LoadType load_sum = 0;
for (auto const& [obj_id, load] : objs) {
load_sum += load;
selected.insert(obj_id);

// We will not consider empty cluster "swaps" here.
if (selected.size() != objs.size()) {
auto src_cluster_bytes_add =
try_clusters.find(shared_id) == try_clusters.end() ? src_cluster_bytes : 0;

double c_try = criterion(
std::make_tuple(shared_id, src_cluster_bytes_add, load_sum),
std::make_tuple(try_rank, try_load, try_total_bytes)
);

vt_debug_print(
terse, temperedlb,
"testing a possible sub-cluster (rank {}): id={} load={} c_try={}, "
"amount over average={}, amount under average={}\n",
try_rank, shared_id, load_sum, c_try, amount_over_average,
amount_under_average
);

if (c_try > 0.0) {
best_c_try = c_try;
best_selected = selected;
best_id = shared_id;
}
}
}
}

if (best_c_try > 0.0) {
vt_debug_print(
normal, temperedlb,
"best_c_try={}, picked subcluster with id={} for rank ={}\n",
best_c_try, best_id, try_rank
);

auto const& [
give_objs,
give_obj_shared_block,
give_shared_blocks_size,
give_obj_working_bytes
] = removeClusterToSend(best_id, best_selected);

auto const this_node = theContext()->getNode();

runInEpochRooted("giveSubCluster", [&]{
proxy_[try_rank].template send<&TemperedLB::giveCluster>(
this_node,
give_shared_blocks_size,
give_objs,
give_obj_shared_block,
give_obj_working_bytes,
-1
);
});

computeClusterSummary();

vt_debug_print(
normal, temperedlb,
"best_c_try={}, sub-cluster sent to rank={}\n",
best_c_try, try_rank
);
}

proxy_[try_rank].template send<&TemperedLB::releaseLock>();

is_swapping_ = false;

if (pending_actions_.size() > 0) {
Expand Down Expand Up @@ -1905,9 +2055,9 @@ void TemperedLB::lockObtained(LockedInfoMsg* in_msg) {
auto msg = promoteMsg(in_msg);

vt_debug_print(
verbose, temperedlb,
"lockObtained: is_locked_={}\n",
is_locked_
normal, temperedlb,
"lockObtained: is_locked_={}, is_subclustering_={}\n",
is_locked_, is_subclustering_
);

auto cur_epoch = theMsg()->getEpoch();
Expand Down Expand Up @@ -1978,22 +2128,54 @@ void TemperedLB::trySubClustering() {

auto const this_node = theContext()->getNode();

vt_print(
temperedlb,
"SUBcluster: load={} max_load={}\n",
this_new_load_, max_load_over_iters_.back()
);

// Only ranks that are close to max should do this...otherwise its a waste
// Very aggressive to start.
if (
auto n_iters = max_load_over_iters_.size();
this_new_load_ / max_load_over_iters_[n_iters - 1] > 0.80
) {
BytesType avg_cluster_bytes = 0;
for (auto const& [src_shared_id, src_cluster] : cur_clusters_) {
auto const& [src_cluster_bytes, src_cluster_load] = src_cluster;
avg_cluster_bytes += src_cluster_bytes;
}
avg_cluster_bytes /= cur_clusters_.size();

for (auto const& [try_rank, try_clusters] : other_rank_clusters_) {
auto const try_num_clusters = try_clusters.size();

// Only target ranks where the rank has fewer clusters and are
// underloaded. Random constants for now
if (try_num_clusters < cur_clusters_.size()) {
BytesType total_clusters_bytes = 0;
for (auto const& [try_shared_id, try_cluster] : try_clusters) {
auto const& [try_cluster_bytes, try_cluster_load] = try_cluster;
total_clusters_bytes += try_cluster_bytes;
}

vt_print(
temperedlb,
"SUBcluster: load={} max_load={}, try_rank={}\n",
this_new_load_, max_load_over_iters_.back(), try_rank
);


// Only target ranks where the target rank has room for the average
// cluster size that this rank has
if (total_clusters_bytes + avg_cluster_bytes < mem_thresh_) {
if (
auto target_rank_load = load_info_.find(try_rank)->second;
target_rank_load < target_max_load_
) {

vt_print(
temperedlb,
"SUBcluster: load={} max_load={}, try_rank={} sending lock\n",
this_new_load_, max_load_over_iters_.back(), try_rank
);

// c-value is now the ratio of load compared to this rank. prefer
// ranks that have less load and have fewer clusters.
proxy_[try_rank].template send<&TemperedLB::tryLock>(
Expand All @@ -2008,6 +2190,19 @@ void TemperedLB::trySubClustering() {
// do nothing--not loaded enough, may be a target to put load
}

// We have to be very careful here since we will allow some reentrancy here.
constexpr int turn_scheduler_times = 10;
for (int i = 0; i < turn_scheduler_times; i++) {
theSched()->runSchedulerOnceImpl();
}

while (not theSched()->workQueueEmpty()) {
theSched()->runSchedulerOnceImpl();
}

ready_to_satisfy_locks_ = true;
satisfyLockRequest();

// Finalize epoch, we have sent our initial round of messages
// from here everything is message driven
theTerm()->finishedEpoch(lazy_epoch);
Expand All @@ -2034,6 +2229,7 @@ void TemperedLB::trySubClustering() {
}

void TemperedLB::swapClusters() {
#if 0
// Do the test to see if we should start sub-clustering. This is probably far
// too aggressive. We could check as an conservative check that requires more
// computation to see if a cluster is blocking progress.
Expand All @@ -2054,6 +2250,7 @@ void TemperedLB::swapClusters() {
return;
}
}
#endif

n_transfers_swap_ = 0;

Expand Down Expand Up @@ -2149,6 +2346,7 @@ void TemperedLB::swapClusters() {
theSched()->runSchedulerOnceImpl();
}

ready_to_satisfy_locks_ = true;
satisfyLockRequest();

// Finalize epoch, we have sent our initial round of messages
Expand Down
Loading

0 comments on commit e35ca04

Please sign in to comment.