Skip to content

Commit

Permalink
#2201: temperedlb: start implementing sub-clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Dec 6, 2023
1 parent 172087d commit f15552b
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 2 deletions.
159 changes: 157 additions & 2 deletions src/vt/vrt/collection/balance/temperedlb/temperedlb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ void TemperedLB::doLBStages(LoadType start_imb) {
underloaded_.clear();
load_info_.clear();
other_rank_clusters_.clear();
max_load_over_iters_.clear();
is_overloaded_ = is_underloaded_ = false;
is_subclustering_ = false;

LoadType best_imb_this_trial = start_imb + 10;

Expand All @@ -678,6 +680,7 @@ void TemperedLB::doLBStages(LoadType start_imb) {
underloaded_.clear();
load_info_.clear();
is_overloaded_ = is_underloaded_ = false;
is_subclustering_ = false;
other_rank_clusters_.clear();

// Not clearing shared_block_size_ because this never changes and
Expand Down Expand Up @@ -776,7 +779,12 @@ void TemperedLB::doLBStages(LoadType start_imb) {
LoadType(this_new_load_)
);

if (rollback_ || theConfig()->vt_debug_temperedlb || (iter_ == num_iters_ - 1)) {
if (
rollback_ ||
theConfig()->vt_debug_temperedlb ||
(iter_ == num_iters_ - 1) ||
transfer_type_ == TransferTypeEnum::SwapClusters
) {
runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] {
// Perform the reduction for Rank_load_modeled -> processor load only
proxy_.allreduce<&TemperedLB::loadStatsHandler, collective::PlusOp>(
Expand Down Expand Up @@ -843,6 +851,8 @@ void TemperedLB::loadStatsHandler(std::vector<balance::LoadData> const& vec) {
auto const& in = vec[0];
new_imbalance_ = in.I();

max_load_over_iters_.push_back(in.max());

auto this_node = theContext()->getNode();
if (this_node == 0) {
vt_debug_print(
Expand Down Expand Up @@ -1625,6 +1635,62 @@ auto TemperedLB::removeClusterToSend(SharedIDType shared_id) {
);
}

void TemperedLB::considerSubClustersAfterLock(MsgSharedPtr<LockedInfoMsg> msg) {
is_swapping_ = true;

// auto const& try_clusters = msg->locked_clusters;
// auto const& try_rank = msg->locked_node;
auto const& try_load = msg->locked_load;
auto const& try_total_bytes = msg->locked_bytes;

// get the shared blocks current residing on this rank
auto shared_blocks_here = getSharedBlocksHere();

// Shared IDs when added to this rank don't put it over the limit
std::set<SharedIDType> possible_transfers;

for (auto const& shared_id : shared_blocks_here) {
if (try_total_bytes + shared_block_size_[shared_id] < mem_thresh_) {
possible_transfers.insert(shared_id);
}
}

// Now, we will greedily try to find a combo of objects that will reduce our
// max

// We can prune some clusters out of this mix based on the requirements that
// this is beneficial
auto const amount_over_average = this_new_load_ - target_max_load_;
auto const amount_under_average = target_max_load_ - try_load;

// Any sub-cluster that is smaller than amount_over_avergae or great than
// amount_under_average we can just skip. We start by skipping all entire
// clusters that don't fit this criteria since sub-clusters will also be
// eliminated from those

std::set<SharedIDType> clusters_to_split;

for (auto const& [src_shared_id, src_cluster] : cur_clusters_) {
auto const& [src_cluster_bytes, src_cluster_load] = src_cluster;
if (
src_cluster_load < amount_over_average or
src_cluster_load > amount_under_average
) {
// skip it
} else {
clusters_to_split.insert(src_shared_id);
}
}

is_swapping_ = false;

if (pending_actions_.size() > 0) {
auto action = pending_actions_.back();
pending_actions_.pop_back();
action();
}
}

void TemperedLB::considerSwapsAfterLock(MsgSharedPtr<LockedInfoMsg> msg) {
is_swapping_ = true;

Expand Down Expand Up @@ -1849,7 +1915,11 @@ void TemperedLB::lockObtained(LockedInfoMsg* in_msg) {

auto action = [this, msg, cur_epoch]{
theMsg()->pushEpoch(cur_epoch);
considerSwapsAfterLock(msg);
if (is_subclustering_) {
considerSubClustersAfterLock(msg);
} else {
considerSwapsAfterLock(msg);
}
theMsg()->popEpoch(cur_epoch);
theTerm()->consume(cur_epoch);
};
Expand Down Expand Up @@ -1899,7 +1969,92 @@ void TemperedLB::satisfyLockRequest() {
}
}

void TemperedLB::trySubClustering() {
is_subclustering_ = true;
n_transfers_swap_ = 0;

auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: subCluster");
theTerm()->pushEpoch(lazy_epoch);

auto const this_node = theContext()->getNode();

// Only ranks that are close to max should do this...otherwise its a waste
// Very aggressive to start.
if (
auto n_iters = max_load_over_iters_.size();
this_new_load_ / max_load_over_iters_[n_iters - 1] > 0.80
) {
for (auto const& [try_rank, try_clusters] : other_rank_clusters_) {
auto const try_num_clusters = try_clusters.size();

// Only target ranks where the rank has fewer clusters and are
// underloaded. Random constants for now
if (try_num_clusters < cur_clusters_.size()) {
if (
auto target_rank_load = load_info_.find(try_rank)->second;
target_rank_load < target_max_load_
) {
// c-value is now the ratio of load compared to this rank. prefer
// ranks that have less load and have fewer clusters.
proxy_[try_rank].template send<&TemperedLB::tryLock>(
this_node, this_new_load_ / target_rank_load
);
}
}

}

} else {
// do nothing--not loaded enough, may be a target to put load
}

// Finalize epoch, we have sent our initial round of messages
// from here everything is message driven
theTerm()->finishedEpoch(lazy_epoch);
theTerm()->popEpoch(lazy_epoch);
vt::runSchedulerThrough(lazy_epoch);

vt_debug_print(
normal, temperedlb,
"After subclustering iteration: total memory usage={}, shared blocks here={}, "
"memory_threshold={}, load={}\n", computeMemoryUsage(),
getSharedBlocksHere().size(), mem_thresh_, this_new_load_
);

int n_rejected = 0;

Check notice on line 2024 in src/vt/vrt/collection/balance/temperedlb/temperedlb.cc

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/vt/vrt/collection/balance/temperedlb/temperedlb.cc#L2024

The scope of the variable 'n_rejected' can be reduced.

// Report on rejection rate in debug mode
if (theConfig()->vt_debug_temperedlb) {
runInEpochCollective("TemperedLB::swapClusters -> compute rejection", [=] {
proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>(
n_rejected, n_transfers_swap_
);
});
}
}

void TemperedLB::swapClusters() {
// Do the test to see if we should start sub-clustering. This is probably far
// too aggressive. We could check as an conservative check that requires more
// computation to see if a cluster is blocking progress.
if (auto const len = max_load_over_iters_.size(); len > 2) {
double const i1 = max_load_over_iters_[len-1];
double const i2 = max_load_over_iters_[len-1];

vt_debug_print(
terse, temperedlb,
"swapClusters: check for subclustering: i1={}, i2={},"
" criteria=abs={} tol={}\n",
i1, i2, std::abs(i1 - i2), 0.01*i1
);

// the max is mostly stable
if (std::abs(i1 - i2) < 0.01*i1) {
trySubClustering();
return;
}
}

n_transfers_swap_ = 0;

auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: swapClusters");
Expand Down
17 changes: 17 additions & 0 deletions src/vt/vrt/collection/balance/temperedlb/temperedlb.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,24 @@ struct TemperedLB : BaseLB {
*/
void considerSwapsAfterLock(MsgSharedPtr<LockedInfoMsg> msg);

/**
* \brief Consider possible subcluster transfers with all the up-to-date info
* from a rank
*
* \param[in] msg update message with all the info
*/
void considerSubClustersAfterLock(MsgSharedPtr<LockedInfoMsg> msg);

/**
* \brief Release a lock on a rank
*/
void releaseLock();

/**
* \brief Try sub-clustering---i.e., breaking up clusters to improve LB
*/
void trySubClustering();

/**
* \brief Give a cluster to a rank
*
Expand Down Expand Up @@ -376,6 +389,10 @@ struct TemperedLB : BaseLB {
int n_transfers_swap_ = 0;
/// Whether it's mid-swap or not
bool is_swapping_ = false;
/// Max-load over ranks vector
std::vector<LoadType> max_load_over_iters_;
/// Whether we are sub-clustering
bool is_subclustering_ = false;
};

}}}} /* end namespace vt::vrt::collection::lb */
Expand Down

0 comments on commit f15552b

Please sign in to comment.