Skip to content

Commit

Permalink
#1265: tests: verify statistics under replay
Browse files Browse the repository at this point in the history
  • Loading branch information
nlslatt committed Nov 9, 2023
1 parent dafa15d commit b9c9263
Showing 1 changed file with 138 additions and 8 deletions.
146 changes: 138 additions & 8 deletions tests/unit/collection/test_workload_data_migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,133 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_2) {
}
}


struct StatsResults {
StatsResults(PhaseType initial_phase, PhaseType lb_interval)
: save_phase_(initial_phase),
comp_phase_(initial_phase),
lb_interval_(lb_interval) { }

PhaseType save_phase_ = 0;
PhaseType comp_phase_ = 0;
PhaseType lb_interval_ = 1;

std::unordered_map<PhaseType, double> O_min_;
std::unordered_map<PhaseType, double> O_max_;
std::unordered_map<PhaseType, double> O_car_;
std::unordered_map<PhaseType, double> P_sum_;

using StatsMsgType = vt::vrt::collection::balance::NodeStatsMsg;
using Statistic = vt::vrt::collection::lb::Statistic;

void saveStatsHandler(StatsMsgType* msg) {
auto in_stat_vec = msg->getConstVal();

auto const& this_node = vt::theContext()->getNode();

if (this_node == 0) {
vt_print(replay, "Saving subset of statistics for phase {}\n", comp_phase_);
}

for (auto&& st : in_stat_vec) {
auto stat = st.stat_;
if (stat == Statistic::P_l) {
P_sum_[save_phase_] = st.sum();
} else if (stat == Statistic::O_l) {
O_min_[save_phase_] = st.min();
O_max_[save_phase_] = st.max();
O_car_[save_phase_] = st.N_;
}
}

++save_phase_;
}

void compStatsHandler(StatsMsgType* msg) {
auto in_stat_vec = msg->getConstVal();

auto const& this_node = vt::theContext()->getNode();

if (this_node == 0) {
vt_print(replay, "Comparing subset of post-LB statistics for phase {}\n", comp_phase_);
}

for (auto&& st : in_stat_vec) {
auto stat = st.stat_;
if (stat == Statistic::P_l) {
EXPECT_EQ(P_sum_[comp_phase_], st.sum());
} else if (stat == Statistic::O_l) {
EXPECT_EQ(O_min_[comp_phase_], st.min());
EXPECT_EQ(O_max_[comp_phase_], st.max());
EXPECT_EQ(O_car_[comp_phase_], st.N_);
}
}

comp_phase_ += lb_interval_;
}
};

std::shared_ptr<ProposedReassignment>
migrateObjectsAndDoStatistics(
std::shared_ptr<LoadModel> base_load_model,
vt::PhaseType phase,
vt::vrt::collection::balance::LBType balancer,
vt::objgroup::proxy::Proxy<StatsResults> o_proxy
) {
std::shared_ptr<ProposedReassignment> new_model = nullptr;

vt::runInEpochCollective("migrate", [&]{
auto postLBWork = [&](ReassignmentMsg *msg) {
auto lb_reassignment = msg->reassignment;
if (lb_reassignment) {
vt_debug_print(
normal, replay,
"global_mig={}, depart={}, arrive={}\n",
lb_reassignment->global_migration_count,
lb_reassignment->depart_.size(),
lb_reassignment->arrive_.size()
);
new_model = std::make_shared<ProposedReassignment>(
base_load_model,
WorkloadDataMigrator::updateCurrentNodes(lb_reassignment)
);
runInEpochCollective("computeAndStoreStats", [=] {
auto stats_cb = vt::theCB()->makeBcast<
StatsResults, StatsResults::StatsMsgType,
&StatsResults::saveStatsHandler
>(o_proxy);
theLBManager()->computeStatistics(new_model, false, phase, stats_cb);
});
}
};
auto cb = theCB()->makeFunc<ReassignmentMsg>(
vt::pipe::LifetimeEnum::Once, postLBWork
);
theLBManager()->startLB(phase, balancer, cb);
});

runInEpochCollective("destroy lb", [&]{
vt::theLBManager()->destroyLB();
});

return new_model;
}

std::shared_ptr<ProposedReassignment>
shiftObjectsRightAndDoStatistics(
std::shared_ptr<LoadModel> base_load_model,
vt::PhaseType phase, vt::objgroup::proxy::Proxy<StatsResults> o_proxy
) {
using vt::vrt::collection::balance::LBType;
return migrateObjectsAndDoStatistics(
base_load_model, phase, LBType::RotateLB, o_proxy
);
}

std::shared_ptr<LBDataHolder>
setupManyWorkloads(
PhaseType initial_phase, PhaseType num_phases, size_t numElements
PhaseType initial_phase, PhaseType num_phases, size_t numElements,
vt::objgroup::proxy::Proxy<StatsResults> o_proxy
) {
auto const& this_node = vt::theContext()->getNode();

Expand Down Expand Up @@ -603,7 +727,7 @@ setupManyWorkloads(
auto base_load_model = setupBaseModel(phase, lbdh);

std::shared_ptr<ProposedReassignment> not_home_model =
shiftObjectsRight(base_load_model, phase);
shiftObjectsRightAndDoStatistics(base_load_model, phase, o_proxy);

std::set<ElementIDStruct> migratable_objects_here;
for (auto it = not_home_model->begin(); it.isValid(); ++it) {
Expand Down Expand Up @@ -649,19 +773,25 @@ struct TestWorkloadReplay : TestParallelHarness {
#endif
};

TEST_F(TestWorkloadReplay, test_run_replay_no_verify) {
TEST_F(TestWorkloadReplay, test_run_replay_verify_some_stats) {
PhaseType initial_phase = 1;
PhaseType num_phases = 5;
const size_t numElements = 5;
const PhaseType lb_interval = 2; // make sure this matches the harness above

auto o_proxy = vt::theObjGroup()->makeCollective<StatsResults>(
initial_phase, lb_interval
);

// first set up the workloads to replay, moving them around by phase
auto lbdh = setupManyWorkloads(initial_phase, num_phases, numElements);
auto lbdh = setupManyWorkloads(
initial_phase, num_phases, numElements, o_proxy
);

using LBManager = vt::vrt::collection::balance::LBManager;
using NodeStatsMsg = vt::vrt::collection::balance::NodeStatsMsg;
// make our own stats callback so that we can check the results
auto stats_cb = vt::theCB()->makeBcast<
LBManager, NodeStatsMsg, &LBManager::statsHandler
>(vt::theLBManager()->getProxy());
StatsResults, StatsResults::StatsMsgType, &StatsResults::compStatsHandler
>(o_proxy);

// then replay them but allow the lb to place objects differently
vt::vrt::collection::balance::replay::replayWorkloads(
Expand Down

0 comments on commit b9c9263

Please sign in to comment.