Skip to content

Commit

Permalink
#1279: gossiplb: Use best of multiple trials
Browse files Browse the repository at this point in the history
  • Loading branch information
nlslatt committed Mar 8, 2021
1 parent 9905f2f commit 23a2469
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 40 deletions.
219 changes: 180 additions & 39 deletions src/vt/vrt/collection/balance/gossiplb/gossiplb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,24 @@ bool GossipLB::isOverloaded(LoadType load) const {
}

void GossipLB::inputParams(balance::SpecEntry* spec) {
std::vector<std::string> allowed{"f", "k", "i", "c"};
std::vector<std::string> allowed{"f", "k", "i", "c", "trials"};
spec->checkAllowedKeys(allowed);
using CriterionEnumUnder = typename std::underlying_type<CriterionEnum>::type;
auto default_c = static_cast<CriterionEnumUnder>(criterion_);
f_ = spec->getOrDefault<int32_t>("f", f_);
k_max_ = spec->getOrDefault<int32_t>("k", k_max_);
num_iters_ = spec->getOrDefault<int32_t>("i", num_iters_);
int32_t c = spec->getOrDefault<int32_t>("c", default_c);
criterion_ = static_cast<CriterionEnum>(c);
f_ = spec->getOrDefault<int32_t>("f", f_);
k_max_ = spec->getOrDefault<int32_t>("k", k_max_);
num_iters_ = spec->getOrDefault<int32_t>("i", num_iters_);
num_trials_ = spec->getOrDefault<int32_t>("trials", num_trials_);
int32_t c = spec->getOrDefault<int32_t>("c", default_c);
criterion_ = static_cast<CriterionEnum>(c);
}

void GossipLB::runLB() {
bool should_lb = false;

auto const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg);
auto const max = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::max);
auto const imb = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::imb);
auto const load = this_load;

if (avg > 0.0000000001) {
Expand All @@ -107,59 +109,185 @@ void GossipLB::runLB() {
}

if (should_lb) {
doLBStages();
doLBStages(imb);
} else {
migrationDone();
}
}

void GossipLB::doLBStages() {
for (iter_ = 0; iter_ < num_iters_; iter_++) {
bool first_iter = iter_ == 0;
void GossipLB::doLBStages(TimeType start_imb) {
std::unordered_map<ObjIDType, TimeType> best_objs;
LoadType best_load;
TimeType best_imb = start_imb+1;
uint16_t best_trial = 0;

debug_print(
gossiplb, node,
"GossipLB::doLBStages: (before) running iter_={}, num_iters_={}, load={}, new_load={}\n",
iter_, num_iters_, this_load, this_new_load_
);
auto this_node = theContext()->getNode();

if (first_iter) {
// Copy this node's object assignments to a local, mutable copy
cur_objs_ = *load_data;
this_new_load_ = this_load;
} else {
// Clear out data structures from previous iteration
selected_.clear();
underloaded_.clear();
load_info_.clear();
k_cur_ = 0;
is_overloaded_ = is_underloaded_ = false;
for (uint16_t trial = 0; trial < num_trials_; ++trial) {
// Clear out data structures
selected_.clear();
underloaded_.clear();
load_info_.clear();
k_cur_ = 0;
is_overloaded_ = is_underloaded_ = false;

for (iter_ = 0; iter_ < num_iters_; iter_++) {
bool first_iter = iter_ == 0;

debug_print(
gossiplb, node,
"GossipLB::doLBStages: (before) running iter_={}, num_iters_={}, load={}, new_load={}\n",
iter_, num_iters_, this_load, this_new_load_
);

if (first_iter) {
// Copy this node's object assignments to a local, mutable copy
cur_objs_ = *load_data;
this_new_load_ = this_load;
} else {
// Clear out data structures from previous iteration
selected_.clear();
underloaded_.clear();
load_info_.clear();
k_cur_ = 0;
is_overloaded_ = is_underloaded_ = false;
}

if (isOverloaded(this_new_load_)) {
is_overloaded_ = true;
} else if (isUnderloaded(this_new_load_)) {
is_underloaded_ = true;
}

inform();
decide();

debug_print(
gossiplb, node,
"GossipLB::doLBStages: (after) running iter_={}, num_iters_={}, load={}, new_load={}\n",
iter_, num_iters_, this_load, this_new_load_
);

runInEpochCollective([=] {
using StatsMsgType = balance::ProcStatsMsg;
using ReduceOp = collective::PlusOp<balance::LoadData>;
auto cb = vt::theCB()->makeBcast<
GossipLB, StatsMsgType, &GossipLB::gossipStatsHandler
>(this->proxy_);
// Perform the reduction for P_l -> processor load only
auto msg = makeMessage<StatsMsgType>(Statistic::P_l, this_new_load_);
this->proxy_.template reduce<ReduceOp>(msg,cb);
});

if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::doLBStages: trial={} iter={} imb={:0.2f}\n",
trial, iter_, new_imbalance_
);
}
}

if (isOverloaded(this_new_load_)) {
is_overloaded_ = true;
} else if (isUnderloaded(this_new_load_)) {
is_underloaded_ = true;
if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::doLBStages: trial={} imb={:0.2f}\n",
trial, new_imbalance_
);
}

inform();
decide();
if (cur_objs_.size() == 0) {
vt_print(
gossiplb,
"GossipLB::doLBStages: trial={} local_objs={}\n",
trial, cur_objs_.size()
);
}

debug_print(
gossiplb, node,
"GossipLB::doLBStages: (after) running iter_={}, num_iters_={}, load={}, new_load={}\n",
iter_, num_iters_, this_load, this_new_load_
);
if (new_imbalance_ <= start_imb && new_imbalance_ < best_imb) {
best_load = this_new_load_;
best_objs = cur_objs_;
best_imb = new_imbalance_;
best_trial = trial;
}

// Clear out for next try or for not migrating by default
cur_objs_.clear();
this_new_load_ = this_load;
}

// Update the load based on new object assignments
this_load = this_new_load_;
if (best_imb <= start_imb) {
cur_objs_ = best_objs;
this_load = this_new_load_ = best_load;
new_imbalance_ = best_imb;

// Update the load based on new object assignments
if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::doLBStages: chose trial={} with imb={:0.2f}\n",
best_trial, new_imbalance_
);
}
} else if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::doLBStages: rejected all trials because they would increase imbalance\n"
);
}

// Concretize lazy migrations by invoking the BaseLB object migration on new
// object node assignments
thunkMigrations();
}

void GossipLB::gossipStatsHandler(StatsMsgType* msg) {
auto in = msg->getConstVal();
new_imbalance_ = in.I();

auto this_node = theContext()->getNode();
if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::gossipStatsHandler: max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.2f}\n",
in.max(), in.min(), in.avg(), in.I()
);
}
/*
if (this_new_load_ <= in.min() * 1.01) {
vt_print(
gossiplb,
"GossipLB::gossipStatsHandler: new_load={:0.2f} min={:0.2f} count={}\n",
this_new_load_, in.min(), cur_objs_.size()
);
}
if (this_new_load_ >= in.max() * 0.99) {
vt_print(
gossiplb,
"GossipLB::gossipStatsHandler: new_load={:0.2f} max={:0.2f} count={}\n",
this_new_load_, in.max(), cur_objs_.size()
);
}
*/
}

void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) {
auto in = msg->getConstVal();

auto n_rejected = in.n_rejected_;
auto n_transfers = in.n_transfers_;
double rej = static_cast<double>(n_rejected) / static_cast<double>(n_rejected + n_transfers) * 100.0;

auto this_node = theContext()->getNode();
if (this_node == 0) {
vt_print(
gossiplb,
"GossipLB::gossipRejectionStatsHandler: n_transfers={} n_rejected={} rejection_rate={:0.1f}%\n",
n_transfers, n_rejected, rej
);
}
}

void GossipLB::inform() {
debug_print(
gossiplb, node,
Expand Down Expand Up @@ -383,6 +511,8 @@ void GossipLB::decide() {
auto lazy_epoch = theTerm()->makeEpochCollective("GossipLB: decide");
theTerm()->addAction(lazy_epoch, [&decide_done] { decide_done = true; });

int n_transfers = 0, n_rejected = 0;

if (is_overloaded_) {
std::vector<NodeType> under = makeUnderloaded();
std::unordered_map<NodeType, ObjsType> migrate_objs;
Expand Down Expand Up @@ -438,13 +568,15 @@ void GossipLB::decide() {
);

if (eval) {
++n_transfers;
migrate_objs[selected_node][obj_id] = obj_load;

this_new_load_ -= obj_load;
selected_load += obj_load;

iter = cur_objs_.erase(iter);
} else {
++n_rejected;
iter++;
}

Expand All @@ -468,6 +600,15 @@ void GossipLB::decide() {
theTerm()->finishedEpoch(lazy_epoch);

theSched()->runSchedulerWhile([&decide_done]{ return not decide_done; });

runInEpochCollective([=] {
using ReduceOp = collective::PlusOp<RejectionStats>;
auto cb = vt::theCB()->makeBcast<
GossipLB, GossipRejectionStatsMsg, &GossipLB::gossipRejectionStatsHandler
>(this->proxy_);
auto msg = makeMessage<GossipRejectionStatsMsg>(n_rejected, n_transfers);
this->proxy_.template reduce<ReduceOp>(msg,cb);
});
}

void GossipLB::thunkMigrations() {
Expand Down
32 changes: 31 additions & 1 deletion src/vt/vrt/collection/balance/gossiplb/gossiplb.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@

namespace vt { namespace vrt { namespace collection { namespace lb {

struct RejectionStats {
RejectionStats() = default;
RejectionStats(int n_rejected, int n_transfers)
: n_rejected_(n_rejected), n_transfers_(n_transfers) { }

friend RejectionStats operator+(RejectionStats a1, RejectionStats const& a2) {
a1.n_rejected_ += a2.n_rejected_;
a1.n_transfers_ += a2.n_transfers_;

return a1;
}

int n_rejected_ = 0;
int n_transfers_ = 0;
};

struct GossipRejectionStatsMsg : collective::ReduceTMsg<RejectionStats> {
GossipRejectionStatsMsg() = default;
GossipRejectionStatsMsg(int n_rejected, int n_transfers)
: ReduceTMsg<RejectionStats>(RejectionStats(n_rejected, n_transfers))
{ }
GossipRejectionStatsMsg(RejectionStats&& rs)
: ReduceTMsg<RejectionStats>(std::move(rs))
{ }
};

struct GossipLB : BaseLB {
using GossipMsg = balance::GossipMsg;
using NodeSetType = std::vector<NodeType>;
Expand All @@ -75,7 +101,7 @@ struct GossipLB : BaseLB {
void inputParams(balance::SpecEntry* spec) override;

protected:
void doLBStages();
void doLBStages(TimeType start_imb);
void inform();
void decide();
void migrate();
Expand All @@ -95,6 +121,8 @@ struct GossipLB : BaseLB {

void lazyMigrateObjsTo(EpochType epoch, NodeType node, ObjsType const& objs);
void inLazyMigrations(balance::LazyMigrationMsg* msg);
void gossipStatsHandler(StatsMsgType* msg);
void gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg);
void thunkMigrations();

void setupDone(ReduceMsgType* msg);
Expand All @@ -105,6 +133,7 @@ struct GossipLB : BaseLB {
uint8_t k_cur_ = 0;
uint16_t iter_ = 0;
uint16_t num_iters_ = 4;
uint16_t num_trials_ = 3;
std::random_device seed_;
std::unordered_map<NodeType, LoadType> load_info_ = {};
objgroup::proxy::Proxy<GossipLB> proxy_ = {};
Expand All @@ -114,6 +143,7 @@ struct GossipLB : BaseLB {
std::unordered_set<NodeType> underloaded_ = {};
std::unordered_map<ObjIDType, TimeType> cur_objs_ = {};
LoadType this_new_load_ = 0.0;
TimeType new_imbalance_ = 0.0;
CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine;
bool setup_done_ = false;
};
Expand Down

0 comments on commit 23a2469

Please sign in to comment.