From b9c9263842d6b258394fac6a65ade338e81f831b Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 9 May 2022 13:08:26 -0700 Subject: [PATCH] #1265: tests: verify statistics under replay --- .../collection/test_workload_data_migrator.cc | 146 +++++++++++++++++- 1 file changed, 138 insertions(+), 8 deletions(-) diff --git a/tests/unit/collection/test_workload_data_migrator.cc b/tests/unit/collection/test_workload_data_migrator.cc index f7911ef0bb..4a731ab378 100644 --- a/tests/unit/collection/test_workload_data_migrator.cc +++ b/tests/unit/collection/test_workload_data_migrator.cc @@ -563,9 +563,133 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_2) { } } + +struct StatsResults { + StatsResults(PhaseType initial_phase, PhaseType lb_interval) + : save_phase_(initial_phase), + comp_phase_(initial_phase), + lb_interval_(lb_interval) { } + + PhaseType save_phase_ = 0; + PhaseType comp_phase_ = 0; + PhaseType lb_interval_ = 1; + + std::unordered_map O_min_; + std::unordered_map O_max_; + std::unordered_map O_car_; + std::unordered_map P_sum_; + + using StatsMsgType = vt::vrt::collection::balance::NodeStatsMsg; + using Statistic = vt::vrt::collection::lb::Statistic; + + void saveStatsHandler(StatsMsgType* msg) { + auto in_stat_vec = msg->getConstVal(); + + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Saving subset of statistics for phase {}\n", comp_phase_); + } + + for (auto&& st : in_stat_vec) { + auto stat = st.stat_; + if (stat == Statistic::P_l) { + P_sum_[save_phase_] = st.sum(); + } else if (stat == Statistic::O_l) { + O_min_[save_phase_] = st.min(); + O_max_[save_phase_] = st.max(); + O_car_[save_phase_] = st.N_; + } + } + + ++save_phase_; + } + + void compStatsHandler(StatsMsgType* msg) { + auto in_stat_vec = msg->getConstVal(); + + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Comparing subset of post-LB statistics for phase {}\n", comp_phase_); + } + + for (auto&& st : in_stat_vec) { + auto stat = st.stat_; + if (stat == Statistic::P_l) { + EXPECT_EQ(P_sum_[comp_phase_], st.sum()); + } else if (stat == Statistic::O_l) { + EXPECT_EQ(O_min_[comp_phase_], st.min()); + EXPECT_EQ(O_max_[comp_phase_], st.max()); + EXPECT_EQ(O_car_[comp_phase_], st.N_); + } + } + + comp_phase_ += lb_interval_; + } +}; + +std::shared_ptr +migrateObjectsAndDoStatistics( + std::shared_ptr base_load_model, + vt::PhaseType phase, + vt::vrt::collection::balance::LBType balancer, + vt::objgroup::proxy::Proxy o_proxy +) { + std::shared_ptr new_model = nullptr; + + vt::runInEpochCollective("migrate", [&]{ + auto postLBWork = [&](ReassignmentMsg *msg) { + auto lb_reassignment = msg->reassignment; + if (lb_reassignment) { + vt_debug_print( + normal, replay, + "global_mig={}, depart={}, arrive={}\n", + lb_reassignment->global_migration_count, + lb_reassignment->depart_.size(), + lb_reassignment->arrive_.size() + ); + new_model = std::make_shared( + base_load_model, + WorkloadDataMigrator::updateCurrentNodes(lb_reassignment) + ); + runInEpochCollective("computeAndStoreStats", [=] { + auto stats_cb = vt::theCB()->makeBcast< + StatsResults, StatsResults::StatsMsgType, + &StatsResults::saveStatsHandler + >(o_proxy); + theLBManager()->computeStatistics(new_model, false, phase, stats_cb); + }); + } + }; + auto cb = theCB()->makeFunc( + vt::pipe::LifetimeEnum::Once, postLBWork + ); + theLBManager()->startLB(phase, balancer, cb); + }); + + runInEpochCollective("destroy lb", [&]{ + vt::theLBManager()->destroyLB(); + }); + + return new_model; +} + +std::shared_ptr +shiftObjectsRightAndDoStatistics( + std::shared_ptr base_load_model, + vt::PhaseType phase, vt::objgroup::proxy::Proxy o_proxy +) { + using vt::vrt::collection::balance::LBType; + return migrateObjectsAndDoStatistics( + base_load_model, phase, LBType::RotateLB, o_proxy + ); +} + std::shared_ptr setupManyWorkloads( - PhaseType initial_phase, PhaseType num_phases, size_t numElements + PhaseType initial_phase, PhaseType num_phases, size_t numElements, + vt::objgroup::proxy::Proxy o_proxy ) { auto const& this_node = vt::theContext()->getNode(); @@ -603,7 +727,7 @@ setupManyWorkloads( auto base_load_model = setupBaseModel(phase, lbdh); std::shared_ptr not_home_model = - shiftObjectsRight(base_load_model, phase); + shiftObjectsRightAndDoStatistics(base_load_model, phase, o_proxy); std::set migratable_objects_here; for (auto it = not_home_model->begin(); it.isValid(); ++it) { @@ -649,19 +773,25 @@ struct TestWorkloadReplay : TestParallelHarness { #endif }; -TEST_F(TestWorkloadReplay, test_run_replay_no_verify) { +TEST_F(TestWorkloadReplay, test_run_replay_verify_some_stats) { PhaseType initial_phase = 1; PhaseType num_phases = 5; const size_t numElements = 5; + const PhaseType lb_interval = 2; // make sure this matches the harness above + + auto o_proxy = vt::theObjGroup()->makeCollective( + initial_phase, lb_interval + ); // first set up the workloads to replay, moving them around by phase - auto lbdh = setupManyWorkloads(initial_phase, num_phases, numElements); + auto lbdh = setupManyWorkloads( + initial_phase, num_phases, numElements, o_proxy + ); - using LBManager = vt::vrt::collection::balance::LBManager; - using NodeStatsMsg = vt::vrt::collection::balance::NodeStatsMsg; + // make our own stats callback so that we can check the results auto stats_cb = vt::theCB()->makeBcast< - LBManager, NodeStatsMsg, &LBManager::statsHandler - >(vt::theLBManager()->getProxy()); + StatsResults, StatsResults::StatsMsgType, &StatsResults::compStatsHandler + >(o_proxy); // then replay them but allow the lb to place objects differently vt::vrt::collection::balance::replay::replayWorkloads(