diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc index 9336698c02..1a6833a477 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc @@ -656,7 +656,9 @@ void TemperedLB::doLBStages(LoadType start_imb) { underloaded_.clear(); load_info_.clear(); other_rank_clusters_.clear(); + max_load_over_iters_.clear(); is_overloaded_ = is_underloaded_ = false; + is_subclustering_ = false; LoadType best_imb_this_trial = start_imb + 10; @@ -678,6 +680,7 @@ void TemperedLB::doLBStages(LoadType start_imb) { underloaded_.clear(); load_info_.clear(); is_overloaded_ = is_underloaded_ = false; + is_subclustering_ = false; other_rank_clusters_.clear(); // Not clearing shared_block_size_ because this never changes and @@ -776,7 +779,12 @@ void TemperedLB::doLBStages(LoadType start_imb) { LoadType(this_new_load_) ); - if (rollback_ || theConfig()->vt_debug_temperedlb || (iter_ == num_iters_ - 1)) { + if ( + rollback_ || + theConfig()->vt_debug_temperedlb || + (iter_ == num_iters_ - 1) || + transfer_type_ == TransferTypeEnum::SwapClusters + ) { runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] { // Perform the reduction for Rank_load_modeled -> processor load only proxy_.allreduce<&TemperedLB::loadStatsHandler, collective::PlusOp>( @@ -843,6 +851,8 @@ void TemperedLB::loadStatsHandler(std::vector const& vec) { auto const& in = vec[0]; new_imbalance_ = in.I(); + max_load_over_iters_.push_back(in.max()); + auto this_node = theContext()->getNode(); if (this_node == 0) { vt_debug_print( @@ -1625,6 +1635,62 @@ auto TemperedLB::removeClusterToSend(SharedIDType shared_id) { ); } +void TemperedLB::considerSubClustersAfterLock(MsgSharedPtr msg) { + is_swapping_ = true; + + // auto const& try_clusters = msg->locked_clusters; + // auto const& try_rank = msg->locked_node; + auto const& try_load = msg->locked_load; + auto const& try_total_bytes = msg->locked_bytes; + + // get the shared blocks current residing on this rank + auto shared_blocks_here = getSharedBlocksHere(); + + // Shared IDs when added to this rank don't put it over the limit + std::set possible_transfers; + + for (auto const& shared_id : shared_blocks_here) { + if (try_total_bytes + shared_block_size_[shared_id] < mem_thresh_) { + possible_transfers.insert(shared_id); + } + } + + // Now, we will greedily try to find a combo of objects that will reduce our + // max + + // We can prune some clusters out of this mix based on the requirements that + // this is beneficial + auto const amount_over_average = this_new_load_ - target_max_load_; + auto const amount_under_average = target_max_load_ - try_load; + + // Any sub-cluster that is smaller than amount_over_avergae or great than + // amount_under_average we can just skip. We start by skipping all entire + // clusters that don't fit this criteria since sub-clusters will also be + // eliminated from those + + std::set clusters_to_split; + + for (auto const& [src_shared_id, src_cluster] : cur_clusters_) { + auto const& [src_cluster_bytes, src_cluster_load] = src_cluster; + if ( + src_cluster_load < amount_over_average or + src_cluster_load > amount_under_average + ) { + // skip it + } else { + clusters_to_split.insert(src_shared_id); + } + } + + is_swapping_ = false; + + if (pending_actions_.size() > 0) { + auto action = pending_actions_.back(); + pending_actions_.pop_back(); + action(); + } +} + void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { is_swapping_ = true; @@ -1849,7 +1915,11 @@ void TemperedLB::lockObtained(LockedInfoMsg* in_msg) { auto action = [this, msg, cur_epoch]{ theMsg()->pushEpoch(cur_epoch); - considerSwapsAfterLock(msg); + if (is_subclustering_) { + considerSubClustersAfterLock(msg); + } else { + considerSwapsAfterLock(msg); + } theMsg()->popEpoch(cur_epoch); theTerm()->consume(cur_epoch); }; @@ -1899,7 +1969,92 @@ void TemperedLB::satisfyLockRequest() { } } +void TemperedLB::trySubClustering() { + is_subclustering_ = true; + n_transfers_swap_ = 0; + + auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: subCluster"); + theTerm()->pushEpoch(lazy_epoch); + + auto const this_node = theContext()->getNode(); + + // Only ranks that are close to max should do this...otherwise its a waste + // Very aggressive to start. + if ( + auto n_iters = max_load_over_iters_.size(); + this_new_load_ / max_load_over_iters_[n_iters - 1] > 0.80 + ) { + for (auto const& [try_rank, try_clusters] : other_rank_clusters_) { + auto const try_num_clusters = try_clusters.size(); + + // Only target ranks where the rank has fewer clusters and are + // underloaded. Random constants for now + if (try_num_clusters < cur_clusters_.size()) { + if ( + auto target_rank_load = load_info_.find(try_rank)->second; + target_rank_load < target_max_load_ + ) { + // c-value is now the ratio of load compared to this rank. prefer + // ranks that have less load and have fewer clusters. + proxy_[try_rank].template send<&TemperedLB::tryLock>( + this_node, this_new_load_ / target_rank_load + ); + } + } + + } + + } else { + // do nothing--not loaded enough, may be a target to put load + } + + // Finalize epoch, we have sent our initial round of messages + // from here everything is message driven + theTerm()->finishedEpoch(lazy_epoch); + theTerm()->popEpoch(lazy_epoch); + vt::runSchedulerThrough(lazy_epoch); + + vt_debug_print( + normal, temperedlb, + "After subclustering iteration: total memory usage={}, shared blocks here={}, " + "memory_threshold={}, load={}\n", computeMemoryUsage(), + getSharedBlocksHere().size(), mem_thresh_, this_new_load_ + ); + + int n_rejected = 0; + + // Report on rejection rate in debug mode + if (theConfig()->vt_debug_temperedlb) { + runInEpochCollective("TemperedLB::swapClusters -> compute rejection", [=] { + proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( + n_rejected, n_transfers_swap_ + ); + }); + } +} + void TemperedLB::swapClusters() { + // Do the test to see if we should start sub-clustering. This is probably far + // too aggressive. We could check as an conservative check that requires more + // computation to see if a cluster is blocking progress. + if (auto const len = max_load_over_iters_.size(); len > 2) { + double const i1 = max_load_over_iters_[len-1]; + double const i2 = max_load_over_iters_[len-1]; + + vt_debug_print( + terse, temperedlb, + "swapClusters: check for subclustering: i1={}, i2={}," + " criteria=abs={} tol={}\n", + i1, i2, std::abs(i1 - i2), 0.01*i1 + ); + + // the max is mostly stable + if (std::abs(i1 - i2) < 0.01*i1) { + trySubClustering(); + return; + } + } + n_transfers_swap_ = 0; auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: swapClusters"); diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h index f5213c3bf3..79f58cfd21 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h @@ -225,11 +225,24 @@ struct TemperedLB : BaseLB { */ void considerSwapsAfterLock(MsgSharedPtr msg); + /** + * \brief Consider possible subcluster transfers with all the up-to-date info + * from a rank + * + * \param[in] msg update message with all the info + */ + void considerSubClustersAfterLock(MsgSharedPtr msg); + /** * \brief Release a lock on a rank */ void releaseLock(); + /** + * \brief Try sub-clustering---i.e., breaking up clusters to improve LB + */ + void trySubClustering(); + /** * \brief Give a cluster to a rank * @@ -376,6 +389,10 @@ struct TemperedLB : BaseLB { int n_transfers_swap_ = 0; /// Whether it's mid-swap or not bool is_swapping_ = false; + /// Max-load over ranks vector + std::vector max_load_over_iters_; + /// Whether we are sub-clustering + bool is_subclustering_ = false; }; }}}} /* end namespace vt::vrt::collection::lb */