Skip to content

Commit

Permalink
#1265: replay: update to use lb callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
nlslatt committed May 9, 2022
1 parent f27d0f8 commit ba3698d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 62 deletions.
53 changes: 34 additions & 19 deletions src/vt/vrt/collection/balance/workload_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,28 +177,43 @@ void replayWorkloads(
runInEpochCollective("WorkloadReplayDriver -> runRealLB", [&] {
// run the load balancer but don't let it automatically migrate;
// instead, remember where the LB wanted to migrate objects
auto lb_reassignment = theLBManager()->selectStartLB(phase);

if (lb_reassignment) {
auto proposed_model = std::make_shared<ProposedReassignment>(
pre_lb_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
migratable_objects_here.clear();
for (auto it = proposed_model->begin(); it.isValid(); ++it) {
if ((*it).isMigratable()) {
migratable_objects_here.insert(*it);
vt_debug_print(
normal, replay,
"element {} is here on phase {} after LB\n", *it, phase
);
std::shared_ptr<ProposedReassignment> proposed_model = nullptr;
auto postLBWork = [&](ReassignmentMsg *msg) {
auto lb_reassignment = msg->reassignment;
if (lb_reassignment) {
proposed_model = std::make_shared<ProposedReassignment>(
pre_lb_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
migratable_objects_here.clear();
for (auto it = proposed_model->begin(); it.isValid(); ++it) {
if ((*it).isMigratable()) {
migratable_objects_here.insert(*it);
vt_debug_print(
normal, replay,
"element {} is here on phase {} after LB\n", *it, phase
);
}
}
}
}
vt_debug_print(
terse, replay,
"Number of objects after LB: {}\n", migratable_objects_here.size()
vt_debug_print(
terse, replay,
"Number of objects after LB: {}\n", migratable_objects_here.size()
);
runInEpochCollective("postLBWorkForReplay -> computeStats", [=] {
auto stats_cb = vt::theCB()->makeBcast<
LBManager, balance::NodeStatsMsg, &LBManager::statsHandler
>(theLBManager()->getProxy());
theLBManager()->computeStatistics(
proposed_model, false, phase, stats_cb
);
});
};
auto cb = theCB()->makeFunc<ReassignmentMsg>(
vt::pipe::LifetimeEnum::Once, postLBWork
);
theLBManager()->selectStartLB(phase, cb);
});
runInEpochCollective("WorkloadReplayDriver -> destroyLB", [&] {
theLBManager()->destroyLB();
Expand Down Expand Up @@ -240,7 +255,7 @@ objgroup::proxy::Proxy<WorkloadDataMigrator>
WorkloadDataMigrator::construct(std::shared_ptr<LoadModel> model_base) {
auto my_proxy = theObjGroup()->makeCollective<WorkloadDataMigrator>();
auto strat = my_proxy.get();
auto base_proxy = my_proxy.template registerBaseCollective<lb::BaseLB>();
auto base_proxy = my_proxy.template castToBase<lb::BaseLB>();
vt_debug_print(
verbose, replay,
"WorkloadDataMigrator proxy={} base_proxy={}\n",
Expand Down
79 changes: 36 additions & 43 deletions tests/unit/collection/test_workload_data_migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ using namespace vt::tests::unit;
using vt::vrt::collection::balance::StatsData;
using vt::vrt::collection::balance::LoadModel;
using vt::vrt::collection::balance::ProposedReassignment;
using vt::vrt::collection::balance::ReassignmentMsg;
using vt::vrt::collection::balance::replay::WorkloadDataMigrator;

struct TestWorkloadDataMigrator : TestParallelHarness { };
Expand Down Expand Up @@ -108,28 +109,34 @@ setupBaseModel(PhaseType phase, std::shared_ptr<StatsData> sd) {
}

std::shared_ptr<ProposedReassignment>
shiftObjectsRight(
migrateObjects(
std::shared_ptr<LoadModel> base_load_model,
vt::PhaseType phase
vt::PhaseType phase,
vt::vrt::collection::balance::LBType balancer
) {
std::shared_ptr<ProposedReassignment> new_model = nullptr;

vt::runInEpochCollective("do shift", [&]{
using vt::vrt::collection::balance::LBType;
auto lb_reassignment = vt::theLBManager()->startLB(phase, LBType::RotateLB);
if (lb_reassignment != nullptr) {
vt_debug_print(
normal, replay,
"global_mig={}, depart={}, arrive={}\n",
lb_reassignment->global_migration_count,
lb_reassignment->depart_.size(),
lb_reassignment->arrive_.size()
);
new_model = std::make_shared<ProposedReassignment>(
base_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
}
vt::runInEpochCollective("migrate", [&]{
auto postLBWork = [&](ReassignmentMsg *msg) {
auto lb_reassignment = msg->reassignment;
if (lb_reassignment) {
vt_debug_print(
normal, replay,
"global_mig={}, depart={}, arrive={}\n",
lb_reassignment->global_migration_count,
lb_reassignment->depart_.size(),
lb_reassignment->arrive_.size()
);
new_model = std::make_shared<ProposedReassignment>(
base_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
}
};
auto cb = theCB()->makeFunc<ReassignmentMsg>(
vt::pipe::LifetimeEnum::Once, postLBWork
);
theLBManager()->startLB(phase, balancer, cb);
});

runInEpochCollective("destroy lb", [&]{
Expand All @@ -140,35 +147,21 @@ shiftObjectsRight(
}

std::shared_ptr<ProposedReassignment>
shiftObjectsRandomly(
shiftObjectsRight(
std::shared_ptr<LoadModel> base_load_model,
vt::PhaseType phase
) {
std::shared_ptr<ProposedReassignment> new_model = nullptr;

vt::runInEpochCollective("do shift", [&]{
using vt::vrt::collection::balance::LBType;
auto lb_reassignment = vt::theLBManager()->startLB(phase, LBType::RandomLB);
if (lb_reassignment != nullptr) {
vt_debug_print(
normal, replay,
"global_mig={}, depart={}, arrive={}\n",
lb_reassignment->global_migration_count,
lb_reassignment->depart_.size(),
lb_reassignment->arrive_.size()
);
new_model = std::make_shared<ProposedReassignment>(
base_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
}
});

runInEpochCollective("destroy lb", [&]{
vt::theLBManager()->destroyLB();
});
using vt::vrt::collection::balance::LBType;
return migrateObjects(base_load_model, phase, LBType::RotateLB);
}

return new_model;
std::shared_ptr<ProposedReassignment>
shiftObjectsRandomly(
std::shared_ptr<LoadModel> base_load_model,
vt::PhaseType phase
) {
using vt::vrt::collection::balance::LBType;
return migrateObjects(base_load_model, phase, LBType::RandomLB);
}


Expand Down

0 comments on commit ba3698d

Please sign in to comment.