Skip to content

Commit

Permalink
#1265: replay: update to reflect stats renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
nlslatt committed May 9, 2022
1 parent ba3698d commit c4454cf
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 53 deletions.
18 changes: 9 additions & 9 deletions src/vt/vrt/collection/balance/workload_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

#include "vt/config.h"
#include "vt/vrt/collection/balance/workload_replay.h"
#include "vt/vrt/collection/balance/stats_data.h"
#include "vt/vrt/collection/balance/lb_data_holder.h"
#include "vt/vrt/collection/balance/lb_invoke/lb_manager.h"
#include "vt/utils/json/json_reader.h"

Expand All @@ -58,15 +58,15 @@ void replayWorkloads(
PhaseType initial_phase, PhaseType phases_to_run
) {
// read in object loads from json files
auto const filename = theConfig()->getLBStatsFileIn();
auto const filename = theConfig()->getLBDataFileIn();
auto workloads = readInWorkloads(filename);

replayWorkloads(initial_phase, phases_to_run, workloads);
}

void replayWorkloads(
PhaseType initial_phase, PhaseType phases_to_run,
std::shared_ptr<StatsData> workloads
std::shared_ptr<LBDataHolder> workloads
) {
using ObjIDType = elm::ElementIDStruct;

Expand Down Expand Up @@ -222,13 +222,13 @@ void replayWorkloads(
}
}

std::shared_ptr<StatsData>
std::shared_ptr<LBDataHolder>
readInWorkloads(const std::string &filename) {
using util::json::Reader;

Reader r{filename};
auto json = r.readFile();
auto sd = std::make_shared<StatsData>(*json);
auto sd = std::make_shared<LBDataHolder>(*json);

for (auto &phase_data : sd->node_data_) {
vt_debug_print(
Expand Down Expand Up @@ -329,7 +329,7 @@ WorkloadDataMigrator::relocateMisplacedWorkloadsHome(
) {
std::shared_ptr<ProposedReassignment> move_home_model = nullptr;

runInEpochCollective("WorkloadDataMigrator -> migrateStatsDataHome", [&] {
runInEpochCollective("WorkloadDataMigrator -> migrateLBDataHome", [&] {
auto norm_lb_proxy = WorkloadDataMigrator::construct(model_base);
auto normalizer = norm_lb_proxy.get();
move_home_model = normalizer->createModelToMoveWorkloadsHome(
Expand All @@ -349,7 +349,7 @@ WorkloadDataMigrator::relocateMisplacedWorkloadsHere(
) {
std::shared_ptr<ProposedReassignment> move_here_model = nullptr;

runInEpochCollective("WorkloadDataMigrator -> migrateStatsDataHere", [&] {
runInEpochCollective("WorkloadDataMigrator -> migrateLBDataHere", [&] {
auto norm_lb_proxy = WorkloadDataMigrator::construct(model_base);
auto normalizer = norm_lb_proxy.get();
move_here_model = normalizer->createModelToMoveWorkloadsHere(
Expand All @@ -372,7 +372,7 @@ WorkloadDataMigrator::createModelToMoveWorkloadsHome(
"constructing load model to get loads from file location to home\n"
);

runInEpochCollective("WorkloadDataMigrator -> transferStatsHome", [&] {
runInEpochCollective("WorkloadDataMigrator -> transferLBDataHome", [&] {
for (auto workload_id : *model_base) {
if (workload_id.isMigratable()) {
// if the object belongs here, do nothing; otherwise, "transfer" it to
Expand Down Expand Up @@ -408,7 +408,7 @@ WorkloadDataMigrator::createModelToMoveWorkloadsHere(
"constructing load model to get loads from home to here\n"
);

runInEpochCollective("WorkloadDataMigrator -> transferStatsHere", [&] {
runInEpochCollective("WorkloadDataMigrator -> transferLBDataHere", [&] {
for (auto workload_id : migratable_objects_here) {
// if the object is already here, do nothing; otherwise, "transfer" it
// from the home rank so that we will have the needed workload data
Expand Down
14 changes: 7 additions & 7 deletions src/vt/vrt/collection/balance/workload_replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

#include "vt/config.h"
#include "vt/elm/elm_id.h"
#include "vt/vrt/collection/balance/stats_data.h"
#include "vt/vrt/collection/balance/lb_data_holder.h"
#include "vt/vrt/collection/balance/baselb/baselb.h"
#include "vt/vrt/collection/balance/model/load_model.h"
#include "vt/vrt/collection/balance/model/proposed_reassignment.h"
Expand All @@ -65,8 +65,8 @@ namespace balance { namespace replay {
* \param[in] initial_phase the first phase to replay
* \param[in] phases_to_run how many phases to replay
*
* The json files specified by the command-line arguments --vt_lb_stats_file_in
* and --vt_lb_stats_dir_in will be imported and the LB data contained within
* The json files specified by the command-line arguments --vt_lb_data_file_in
* and --vt_lb_data_dir_in will be imported and the LB data contained within
* will be fed through the load balancer(s) specified on the vt command-line
* on each requested phase, allowing new load balancing decisions to happen.
* There is no requirement to colocate the LB data on the same rank as the
Expand All @@ -91,17 +91,17 @@ void replayWorkloads(
*/
void replayWorkloads(
PhaseType initial_phase, PhaseType phases_to_run,
std::shared_ptr<StatsData> workloads
std::shared_ptr<LBDataHolder> workloads
);

/**
* \brief Build a StatsData object from the LB data in a json file
* \brief Build a LBDataHolder object from the LB data in a json file
*
* \param[in] filename read in LB data from the specified json file
*
* \return the StatsData object built from the LB data
* \return the LBDataHolder object built from the LB data
*/
std::shared_ptr<StatsData>
std::shared_ptr<LBDataHolder>
readInWorkloads(const std::string &filename);


Expand Down
70 changes: 35 additions & 35 deletions tests/unit/collection/test_workload_data_migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
#include "vt/elm/elm_id.h"
#include "vt/elm/elm_id_bits.h"
#include "vt/vrt/collection/balance/lb_common.h"
#include "vt/vrt/collection/balance/stats_data.h"
#include "vt/vrt/collection/balance/lb_data_holder.h"
#include "vt/vrt/collection/balance/lb_invoke/lb_manager.h"
#include "vt/vrt/collection/balance/workload_replay.h"
#include "vt/vrt/collection/balance/model/proposed_reassignment.h"
Expand All @@ -60,15 +60,15 @@ namespace vt { namespace tests { namespace unit { namespace replay {

using namespace vt::tests::unit;

using vt::vrt::collection::balance::StatsData;
using vt::vrt::collection::balance::LBDataHolder;
using vt::vrt::collection::balance::LoadModel;
using vt::vrt::collection::balance::ProposedReassignment;
using vt::vrt::collection::balance::ReassignmentMsg;
using vt::vrt::collection::balance::replay::WorkloadDataMigrator;

struct TestWorkloadDataMigrator : TestParallelHarness { };

std::shared_ptr<StatsData>
std::shared_ptr<LBDataHolder>
setupWorkloads(PhaseType phase, size_t numElements) {
auto const& this_node = vt::theContext()->getNode();

Expand All @@ -82,24 +82,24 @@ setupWorkloads(PhaseType phase, size_t numElements) {
);
}

auto sd = std::make_shared<StatsData>();
auto lbdh = std::make_shared<LBDataHolder>();

for (auto&& elmID : myElemList) {
double tval = elmID.id * 2;
sd->node_data_[phase][elmID].whole_phase_load = tval;
auto &subphase_loads = sd->node_data_[phase][elmID].subphase_loads;
lbdh->node_data_[phase][elmID].whole_phase_load = tval;
auto &subphase_loads = lbdh->node_data_[phase][elmID].subphase_loads;
subphase_loads.push_back(elmID.id % 2 ? tval : 0);
subphase_loads.push_back(elmID.id % 2 ? 0 : tval);
}

return sd;
return lbdh;
}

std::shared_ptr<LoadModel>
setupBaseModel(PhaseType phase, std::shared_ptr<StatsData> sd) {
setupBaseModel(PhaseType phase, std::shared_ptr<LBDataHolder> lbdh) {
auto base_load_model = vt::theLBManager()->getBaseLoadModel();
// force it to use our json workloads, not anything it may have collected
base_load_model->setLoads(&sd->node_data_, &sd->node_comm_);
base_load_model->setLoads(&lbdh->node_data_, &lbdh->node_comm_);

vt::runInEpochCollective("updateLoads", [&]{
base_load_model->updateLoads(phase);
Expand Down Expand Up @@ -172,8 +172,8 @@ TEST_F(TestWorkloadDataMigrator, test_normalize_call) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

vt::objgroup::proxy::Proxy<WorkloadDataMigrator> norm_lb_proxy;
std::shared_ptr<ProposedReassignment> new_model = nullptr;
Expand Down Expand Up @@ -231,8 +231,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_home) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// move everything off the home node
std::shared_ptr<ProposedReassignment> not_home_model = shiftObjectsRight(
Expand Down Expand Up @@ -280,8 +280,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_some_data_home) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// move everything off the home node
std::shared_ptr<ProposedReassignment> not_home_model = shiftObjectsRight(
Expand Down Expand Up @@ -342,8 +342,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_home) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// move everything off the home node
std::shared_ptr<ProposedReassignment> not_home_model = shiftObjectsRight(
Expand Down Expand Up @@ -394,8 +394,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_some_data_here_from_home) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// move everything off the home node
std::shared_ptr<ProposedReassignment> not_home_model = shiftObjectsRight(
Expand Down Expand Up @@ -456,8 +456,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_1) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// shift the workloads to not be home
std::shared_ptr<ProposedReassignment> workloads_not_home_model =
Expand Down Expand Up @@ -510,8 +510,8 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_2) {
PhaseType phase = 0;
const size_t numElements = 5;

auto sd = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, sd);
auto lbdh = setupWorkloads(phase, numElements);
auto base_load_model = setupBaseModel(phase, lbdh);

// put the workloads whereever
std::shared_ptr<ProposedReassignment> workloads_whereever_model =
Expand Down Expand Up @@ -558,7 +558,7 @@ TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_2) {
}
}

std::shared_ptr<StatsData>
std::shared_ptr<LBDataHolder>
setupManyWorkloads(
PhaseType initial_phase, PhaseType num_phases, size_t numElements
) {
Expand All @@ -574,24 +574,24 @@ setupManyWorkloads(
);
}

auto sd = std::make_shared<StatsData>();
auto lbdh = std::make_shared<LBDataHolder>();

PhaseType stop_phase = initial_phase + num_phases;
for (PhaseType phase = initial_phase; phase < stop_phase; ++phase) {
for (size_t ii = 0; ii < numElements; ++ii) {
auto elmID = myElemList[ii];
double tval = this_node + (ii + 10) * 2;
sd->node_data_[phase][elmID].whole_phase_load = tval + phase;
auto &subphase_loads = sd->node_data_[phase][elmID].subphase_loads;
lbdh->node_data_[phase][elmID].whole_phase_load = tval + phase;
auto &subphase_loads = lbdh->node_data_[phase][elmID].subphase_loads;
subphase_loads.push_back(elmID.id % 2 ? tval : phase);
subphase_loads.push_back(elmID.id % 2 ? phase : tval);
}
}

auto scrambled_sd = std::make_shared<StatsData>();
auto scrambled_lbdh = std::make_shared<LBDataHolder>();

for (PhaseType phase = initial_phase; phase < stop_phase; ++phase) {
auto base_load_model = setupBaseModel(phase, sd);
auto base_load_model = setupBaseModel(phase, lbdh);

std::shared_ptr<ProposedReassignment> not_home_model =
shiftObjectsRight(base_load_model, phase);
Expand All @@ -613,20 +613,20 @@ setupManyWorkloads(
for (auto it = here_model->begin(); it.isValid(); ++it) {
auto obj_id = *it;
using vt::vrt::collection::balance::PhaseOffset;
scrambled_sd->node_data_[phase][obj_id].whole_phase_load =
scrambled_lbdh->node_data_[phase][obj_id].whole_phase_load =
here_model->getWork(
obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE}
);
scrambled_sd->node_data_[phase][*it].subphase_loads.push_back(
scrambled_lbdh->node_data_[phase][*it].subphase_loads.push_back(
here_model->getWork(obj_id, {PhaseOffset::NEXT_PHASE, 0})
);
scrambled_sd->node_data_[phase][*it].subphase_loads.push_back(
scrambled_lbdh->node_data_[phase][*it].subphase_loads.push_back(
here_model->getWork(obj_id, {PhaseOffset::NEXT_PHASE, 1})
);
}
}

return scrambled_sd;
return scrambled_lbdh;
}

struct TestWorkloadReplay : TestParallelHarness {
Expand All @@ -645,11 +645,11 @@ TEST_F(TestWorkloadReplay, test_run_replay_no_verify) {
const size_t numElements = 5;

// first set up the workloads to replay, moving them around by phase
auto sd = setupManyWorkloads(initial_phase, num_phases, numElements);
auto lbdh = setupManyWorkloads(initial_phase, num_phases, numElements);

// then replay them but allow the lb to place objects differently
vt::vrt::collection::balance::replay::replayWorkloads(
initial_phase, num_phases, sd
initial_phase, num_phases, lbdh
);
}

Expand Down
4 changes: 2 additions & 2 deletions tools/workload_replay/simulate_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ int main(int argc, char** argv) {
argc != 3,
"Must have two app-specific arguments: <initial phase> <phases to run>\n"
"The json workload files needs to be specified using\n"
"--vt_lb_stats_file_in and --vt_lb_stats_dir_in"
"--vt_lb_data_file_in and --vt_lb_data_dir_in"
);

// initial phase to simulate
Expand All @@ -62,7 +62,7 @@ int main(int argc, char** argv) {
PhaseType phases_to_run = atoi(argv[2]);

// the workloads used will be those specified with the command-line arguments
// --vt_lb_stats_file_in and --vt_lb_stats_dir_in
// --vt_lb_data_file_in and --vt_lb_data_dir_in
vt::vrt::collection::balance::replay::replayWorkloads(
initial_phase, phases_to_run
);
Expand Down

0 comments on commit c4454cf

Please sign in to comment.