diff --git a/scripts/JSON_data_files_validator.py b/scripts/JSON_data_files_validator.py index 72a5c51c38..a14c083fc6 100644 --- a/scripts/JSON_data_files_validator.py +++ b/scripts/JSON_data_files_validator.py @@ -434,13 +434,14 @@ def validate_comm_links(all_jsons): for data in all_jsons: tasks = data["phases"][n]["tasks"] - id_key = "id" if "id" in tasks[0]["entity"] else "seq_id" - task_ids.update({int(task["entity"][id_key]) for task in tasks}) + task_ids.update( + {int(task["entity"].get("id", task["entity"].get("seq_id"))) for task in tasks} + ) if data["phases"][n].get("communications") is not None: comms = data["phases"][n]["communications"] - comm_ids.update({int(comm["from"][id_key]) for comm in comms}) - comm_ids.update({int(comm["to"][id_key]) for comm in comms}) + comm_ids.update({int(comm["from"].get("id", comm["from"].get("seq_id"))) for comm in comms}) + comm_ids.update({int(comm["to"].get("id", comm["to"].get("seq_id"))) for comm in comms}) if not comm_ids.issubset(task_ids): logging.error( diff --git a/scripts/LBDatafile_schema.py b/scripts/LBDatafile_schema.py index 743fff574e..9dca0276c5 100644 --- a/scripts/LBDatafile_schema.py +++ b/scripts/LBDatafile_schema.py @@ -1,9 +1,16 @@ from schema import And, Optional, Schema -def validate_id_and_seq_id(field): - """Ensure that either seq_id or id is provided.""" +def validate_ids(field): + """ + Ensure that 1) either seq_id or id is provided, + and 2) if an object is migratable, collection_id has been set. + """ if 'seq_id' not in field and 'id' not in field: raise ValueError('Either id (bit-encoded) or seq_id must be provided.') + + if field['migratable'] and 'seq_id' in field and 'collection_id' not in field: + raise ValueError('If an entity is migratable, it must have a collection_id') + return field LBDatafile_schema = Schema( @@ -45,7 +52,7 @@ def validate_id_and_seq_id(field): 'type': str, 'migratable': bool, Optional('objgroup_id'): int - }, validate_id_and_seq_id), + }, validate_ids), 'node': int, 'resource': str, Optional('subphases'): [ @@ -71,7 +78,7 @@ def validate_id_and_seq_id(field): Optional('migratable'): bool, Optional('index'): [int], Optional('objgroup_id'): int, - }, validate_id_and_seq_id), + }, validate_ids), 'messages': int, 'from': And({ 'type': str, @@ -82,7 +89,7 @@ def validate_id_and_seq_id(field): Optional('migratable'): bool, Optional('index'): [int], Optional('objgroup_id'): int, - }, validate_id_and_seq_id), + }, validate_ids), 'bytes': float } ], diff --git a/src/vt/configs/arguments/app_config.h b/src/vt/configs/arguments/app_config.h index aa83d05588..2ddb024a31 100644 --- a/src/vt/configs/arguments/app_config.h +++ b/src/vt/configs/arguments/app_config.h @@ -160,6 +160,7 @@ struct AppConfig { bool vt_lb_self_migration = false; bool vt_lb_spec = false; std::string vt_lb_spec_file = ""; + bool vt_lb_run_lb_first_phase = false; bool vt_no_detect_hang = false; diff --git a/src/vt/configs/arguments/args.cc b/src/vt/configs/arguments/args.cc index ee3e6f8c25..0aae6c0572 100644 --- a/src/vt/configs/arguments/args.cc +++ b/src/vt/configs/arguments/args.cc @@ -913,6 +913,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) { auto lb_self_migration = "Allow load balancer to migrate objects to the same node"; auto lb_spec = "Enable LB spec file (defines which phases output LB data)"; auto lb_spec_file = "File containing LB spec; --vt_lb_spec to enable"; + auto lb_first_phase_info = "Force LB to run on the first phase (phase 0)"; auto s = app.add_flag("--vt_lb", appConfig.vt_lb, lb); auto t1 = app.add_flag("--vt_lb_quiet", appConfig.vt_lb_quiet, lb_quiet); auto u = app.add_option("--vt_lb_file_name", appConfig.vt_lb_file_name, lb_file_name)->capture_default_str()->check(CLI::ExistingFile); @@ -935,6 +936,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) { auto lbasm = app.add_flag("--vt_lb_self_migration", appConfig.vt_lb_self_migration, lb_self_migration); auto lbspec = app.add_flag("--vt_lb_spec", appConfig.vt_lb_spec, lb_spec); auto lbspecfile = app.add_option("--vt_lb_spec_file", appConfig.vt_lb_spec_file, lb_spec_file)->capture_default_str()->check(CLI::ExistingFile); + auto lb_first_phase = app.add_flag("--vt_lb_run_lb_first_phase", appConfig.vt_lb_run_lb_first_phase, lb_first_phase_info); // --vt_lb_name excludes --vt_lb_file_name, and vice versa v->excludes(u); @@ -963,6 +965,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) { lbasm->group(debugLB); lbspec->group(debugLB); lbspecfile->group(debugLB); + lb_first_phase->group(debugLB); // help options deliberately omitted from the debugLB group above so that // they appear grouped with --vt_help when --vt_help is used diff --git a/src/vt/configs/types/types_sentinels.h b/src/vt/configs/types/types_sentinels.h index 9bcd343d25..57d38c151f 100644 --- a/src/vt/configs/types/types_sentinels.h +++ b/src/vt/configs/types/types_sentinels.h @@ -89,6 +89,7 @@ static constexpr SequentialIDType const first_seq_id = 1; static constexpr PriorityType const no_priority = 0; static constexpr PriorityLevelType const no_priority_level = 0; static constexpr ThreadIDType const no_thread_id = 0; +static constexpr SharedIDType const no_shared_id = -1; } // end namespace vt diff --git a/src/vt/configs/types/types_type.h b/src/vt/configs/types/types_type.h index d09d9fe4ca..9962261752 100644 --- a/src/vt/configs/types/types_type.h +++ b/src/vt/configs/types/types_type.h @@ -117,6 +117,8 @@ using PriorityLevelType = uint8_t; using ComponentIDType = uint32_t; /// Used to hold a unique ID for a user-level thread on a particular node using ThreadIDType = uint64_t; +/// Used to hold a shared ID +using SharedIDType = int; // Action types for attaching a closure to a runtime function /// Used for generically store an action to perform diff --git a/src/vt/elm/elm_comm.h b/src/vt/elm/elm_comm.h index 692d06700a..44f610a269 100644 --- a/src/vt/elm/elm_comm.h +++ b/src/vt/elm/elm_comm.h @@ -44,6 +44,7 @@ #if !defined INCLUDED_VT_ELM_ELM_COMM_H #define INCLUDED_VT_ELM_ELM_COMM_H +#include "vt/configs/types/types_type.h" #include "vt/elm/elm_id.h" #include @@ -58,7 +59,9 @@ enum struct CommCategory : int8_t { CollectionToNodeBcast = 5, NodeToCollectionBcast = 6, CollectiveToCollectionBcast = 7, - LocalInvoke = 8 + LocalInvoke = 8, + WriteShared = 9, + ReadOnlyShared = 10 }; inline NodeType objGetNode(ElementIDStruct const id) { @@ -71,6 +74,8 @@ struct CommKey { struct CollectionTag { }; struct CollectionToNodeTag { }; struct NodeToCollectionTag { }; + struct WriteSharedTag { }; + struct ReadOnlySharedTag { }; CommKey() = default; CommKey(CommKey const&) = default; @@ -107,12 +112,25 @@ struct CommKey { cat_(bcast ? CommCategory::NodeToCollectionBcast : CommCategory::NodeToCollection) { } + CommKey( + WriteSharedTag, + NodeType in_home, int in_shared_id + ) : nto_(in_home), shared_id_(in_shared_id), cat_(CommCategory::WriteShared) + { } + + CommKey( + ReadOnlySharedTag, + NodeType in_home, int in_shared_id + ) : nto_(in_home), shared_id_(in_shared_id), cat_(CommCategory::ReadOnlyShared) + { } + ElementIDStruct from_ = {}; ElementIDStruct to_ = {}; ElementIDStruct edge_id_ = {}; NodeType nfrom_ = uninitialized_destination; NodeType nto_ = uninitialized_destination; + SharedIDType shared_id_ = no_shared_id; CommCategory cat_ = CommCategory::SendRecv; ElementIDStruct fromObj() const { return from_; } @@ -121,6 +139,7 @@ struct CommKey { ElementIDType toNode() const { return nto_; } ElementIDStruct edgeID() const { return edge_id_; } CommCategory commCategory() const { return cat_; } + int sharedID() const { return shared_id_; } bool selfEdge() const { return cat_ == CommCategory::SendRecv and from_ == to_; } bool offNode() const { @@ -140,12 +159,12 @@ struct CommKey { return k.from_ == from_ and k.to_ == to_ and k.nfrom_ == nfrom_ and k.nto_ == nto_ and - k.cat_ == cat_; + k.cat_ == cat_ and k.shared_id_ == shared_id_; } template void serialize(SerializerT& s) { - s | from_ | to_ | nfrom_ | nto_ | cat_ | edge_id_; + s | from_ | to_ | nfrom_ | nto_ | cat_ | edge_id_ | shared_id_; } }; @@ -189,7 +208,8 @@ struct hash { size_t operator()(vt::elm::CommKey const& in) const { return std::hash()( std::hash()(in.from_) ^ - std::hash()(in.to_) ^ in.nfrom_ ^ in.nto_ + std::hash()(in.to_) ^ in.nfrom_ ^ in.nto_ ^ + in.shared_id_ ); } }; diff --git a/src/vt/elm/elm_id.cc b/src/vt/elm/elm_id.cc index 089ed2953e..a9303a2f3e 100644 --- a/src/vt/elm/elm_id.cc +++ b/src/vt/elm/elm_id.cc @@ -41,6 +41,7 @@ //@HEADER */ +#include "vt/context/context.h" #include "vt/elm/elm_id.h" #include "vt/elm/elm_id_bits.h" @@ -58,4 +59,8 @@ NodeType ElementIDStruct::getCurrNode() const { return curr_node; } +bool ElementIDStruct::isLocatedOnThisNode() const { + return theContext()->getNode() == curr_node and not isMigratable(); +} + }} /* end namespace vt::elm */ diff --git a/src/vt/elm/elm_id.h b/src/vt/elm/elm_id.h index d4baf3446a..17fd2a6db3 100644 --- a/src/vt/elm/elm_id.h +++ b/src/vt/elm/elm_id.h @@ -78,6 +78,7 @@ struct ElementIDStruct { bool isMigratable() const; NodeType getHomeNode() const; NodeType getCurrNode() const; + bool isLocatedOnThisNode() const; }; diff --git a/src/vt/elm/elm_lb_data.cc b/src/vt/elm/elm_lb_data.cc index 71796b3b60..dfa7bd82e1 100644 --- a/src/vt/elm/elm_lb_data.cc +++ b/src/vt/elm/elm_lb_data.cc @@ -86,6 +86,22 @@ void ElementLBData::sendToEntity( sendComm(key, bytes); } +void ElementLBData::addWritableSharedID( + NodeType home, int shared_id, double bytes +) { + sendComm( + elm::CommKey{elm::CommKey::WriteSharedTag{}, home, shared_id}, bytes + ); +} + +void ElementLBData::addReadOnlySharedID( + NodeType home, int shared_id, double bytes +) { + sendComm( + elm::CommKey{elm::CommKey::ReadOnlySharedTag{}, home, shared_id}, bytes + ); +} + void ElementLBData::sendComm(elm::CommKey key, double bytes) { phase_comm_[cur_phase_][key].sendMsg(bytes); subphase_comm_[cur_phase_].resize(cur_subphase_ + 1); diff --git a/src/vt/elm/elm_lb_data.h b/src/vt/elm/elm_lb_data.h index 01b6c4c8b6..e7b0c3e3f3 100644 --- a/src/vt/elm/elm_lb_data.h +++ b/src/vt/elm/elm_lb_data.h @@ -72,6 +72,9 @@ struct ElementLBData { void sendToEntity(ElementIDStruct to, ElementIDStruct from, double bytes); void sendComm(elm::CommKey key, double bytes); + void addWritableSharedID(NodeType home, int shared_id, double bytes); + void addReadOnlySharedID(NodeType home, int shared_id, double bytes); + void recvComm(elm::CommKey key, double bytes); void recvObjData( ElementIDStruct to_perm, diff --git a/src/vt/messaging/active.h b/src/vt/messaging/active.h index 554b89b12a..8721080054 100644 --- a/src/vt/messaging/active.h +++ b/src/vt/messaging/active.h @@ -1722,6 +1722,19 @@ struct ActiveMessenger : runtime::component::PollableComponent MsgSizeType const msg_size ); +public: + /** + * \brief Get the rank-based LB data along with element ID for rank-based work + * + * \return tuple with pointers to each one + */ + auto getRankLBData() { + return std::make_tuple( + &bare_handler_dummy_elm_id_for_lb_data_, + &bare_handler_lb_data_ + ); + } + private: # if vt_check_enabled(trace_enabled) trace::UserEventIDType trace_irecv = trace::no_user_event_id; diff --git a/src/vt/vrt/collection/balance/baselb/baselb.cc b/src/vt/vrt/collection/balance/baselb/baselb.cc index 4f19fc9648..dcbb3f4f3a 100644 --- a/src/vt/vrt/collection/balance/baselb/baselb.cc +++ b/src/vt/vrt/collection/balance/baselb/baselb.cc @@ -143,6 +143,9 @@ std::shared_ptr BaseLB::normalizeReassignments() { auto const new_node = std::get<1>(transfer); auto const current_node = obj_id.curr_node; + vtAbortIf( + not obj_id.isMigratable(), "Transfering object that is not migratable" + ); if (current_node == new_node) { vt_debug_print( verbose, lb, "BaseLB::normalizeReassignments(): self migration\n" diff --git a/src/vt/vrt/collection/balance/lb_data_holder.cc b/src/vt/vrt/collection/balance/lb_data_holder.cc index 701d3d27bb..dd8a3dfb34 100644 --- a/src/vt/vrt/collection/balance/lb_data_holder.cc +++ b/src/vt/vrt/collection/balance/lb_data_holder.cc @@ -252,9 +252,7 @@ std::unique_ptr LBDataHolder::toJson(PhaseType phase) const { i = 0; if (node_comm_.find(phase) != node_comm_.end()) { - for (auto&& elm : node_comm_.at(phase)) { - auto volume = elm.second; - auto const& key = elm.first; + for (auto const& [key, volume] : node_comm_.at(phase)) { j["communications"][i]["bytes"] = volume.bytes; j["communications"][i]["messages"] = volume.messages; @@ -296,6 +294,17 @@ std::unique_ptr LBDataHolder::toJson(PhaseType phase) const { outputEntity(j["communications"][i]["from"], key.fromObj()); break; } + case elm::CommCategory::ReadOnlyShared: + case elm::CommCategory::WriteShared: { + j["communications"][i]["type"] = + (key.cat_ == elm::CommCategory::ReadOnlyShared) ? + "ReadOnlyShared" : "WriteShared"; + j["communications"][i]["to"]["type"] = "node"; + j["communications"][i]["to"]["id"] = key.toNode(); + j["communications"][i]["from"]["type"] = "shared_id"; + j["communications"][i]["from"]["id"] = key.sharedID(); + break; + } case elm::CommCategory::LocalInvoke: case elm::CommCategory::CollectiveToCollectionBcast: // not currently supported @@ -476,6 +485,34 @@ LBDataHolder::LBDataHolder(nlohmann::json const& j) ); CommVolume vol{bytes, messages}; this->node_comm_[id][key] = vol; + } else if ( + type == "ReadOnlyShared" or type == "WriteShared" + ) { + vtAssertExpr(comm["from"]["type"] == "shared_id"); + vtAssertExpr(comm["to"]["type"] == "node"); + + CommVolume vol{bytes, messages}; + auto to_node = comm["to"]["id"]; + vtAssertExpr(to_node.is_number()); + + auto from_shared_id = comm["from"]["id"]; + vtAssertExpr(from_shared_id.is_number()); + + if (type == "ReadOnlyShared") { + CommKey key( + CommKey::ReadOnlySharedTag{}, + static_cast(to_node), + static_cast(from_shared_id) + ); + this->node_comm_[id][key] = vol; + } else { + CommKey key( + CommKey::WriteSharedTag{}, + static_cast(to_node), + static_cast(from_shared_id) + ); + this->node_comm_[id][key] = vol; + } } } } diff --git a/src/vt/vrt/collection/balance/lb_data_holder.h b/src/vt/vrt/collection/balance/lb_data_holder.h index fb2c9fce48..729949693f 100644 --- a/src/vt/vrt/collection/balance/lb_data_holder.h +++ b/src/vt/vrt/collection/balance/lb_data_holder.h @@ -46,12 +46,9 @@ #include "vt/config.h" #include "vt/vrt/collection/balance/lb_common.h" -#include "vt/elm/elm_comm.h" #include #include -#include -#include #include diff --git a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc index 50898735c3..de4705aa5e 100644 --- a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc +++ b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc @@ -130,7 +130,11 @@ LBType LBManager::decideLBToRun(PhaseType phase, bool try_file) { } else { auto interval = theConfig()->vt_lb_interval; vtAssert(interval != 0, "LB Interval must not be 0"); - if (phase % interval == 1 || (interval == 1 && phase != 0)) { + vt::PhaseType offset = theConfig()->vt_lb_run_lb_first_phase ? 0 : 1; + if ( + phase % interval == offset || + (interval == 1 && phase != 0) + ) { bool name_match = false; for (auto&& elm : get_lb_names()) { if (elm.second == theConfig()->vt_lb_name) { diff --git a/src/vt/vrt/collection/balance/model/composed_model.cc b/src/vt/vrt/collection/balance/model/composed_model.cc index 1090662414..f7b9249692 100644 --- a/src/vt/vrt/collection/balance/model/composed_model.cc +++ b/src/vt/vrt/collection/balance/model/composed_model.cc @@ -102,4 +102,8 @@ int ComposedModel::getNumSubphases() const { return base_->getNumSubphases(); } +CommMapType ComposedModel::getComm(PhaseOffset when) const { + return base_->getComm(when); +} + }}}} diff --git a/src/vt/vrt/collection/balance/model/composed_model.h b/src/vt/vrt/collection/balance/model/composed_model.h index 9330afda65..d8b4f707c7 100644 --- a/src/vt/vrt/collection/balance/model/composed_model.h +++ b/src/vt/vrt/collection/balance/model/composed_model.h @@ -77,6 +77,7 @@ class ComposedModel : public LoadModel bool hasUserData() const override; ElmUserDataType getUserData(ElementIDStruct object, PhaseOffset when) const override; unsigned int getNumPastPhasesNeeded(unsigned int look_back) const override; + CommMapType getComm(PhaseOffset offset) const override; ObjectIterator begin() const override; diff --git a/src/vt/vrt/collection/balance/model/load_model.h b/src/vt/vrt/collection/balance/model/load_model.h index e58d6e5468..535d7f339b 100644 --- a/src/vt/vrt/collection/balance/model/load_model.h +++ b/src/vt/vrt/collection/balance/model/load_model.h @@ -215,6 +215,17 @@ struct LoadModel */ virtual void updateLoads(PhaseType last_completed_phase) = 0; + /** + * \brief Provide all the comm info for a given phase + * + * \param[in] when the interval in which comm is desired + * + * \return the comm info + */ + virtual CommMapType getComm([[maybe_unused]] PhaseOffset when) const { + return CommMapType{}; + } + /** * \brief Provide an estimate of the given object's load during a specified interval * diff --git a/src/vt/vrt/collection/balance/model/raw_data.cc b/src/vt/vrt/collection/balance/model/raw_data.cc index c0209bdd39..b5199ef4b0 100644 --- a/src/vt/vrt/collection/balance/model/raw_data.cc +++ b/src/vt/vrt/collection/balance/model/raw_data.cc @@ -130,6 +130,15 @@ ElmUserDataType RawData::getUserData(ElementIDStruct object, PhaseOffset offset) } } +CommMapType RawData::getComm(PhaseOffset offset) const { + auto phase = getNumCompletedPhases() + offset.phases; + if (auto it = proc_comm_->find(phase); it != proc_comm_->end()) { + return it->second; + } else { + return CommMapType{}; + } +} + unsigned int RawData::getNumPastPhasesNeeded(unsigned int look_back) const { return look_back; diff --git a/src/vt/vrt/collection/balance/model/raw_data.h b/src/vt/vrt/collection/balance/model/raw_data.h index 6d7ec73c21..c7d6bef224 100644 --- a/src/vt/vrt/collection/balance/model/raw_data.h +++ b/src/vt/vrt/collection/balance/model/raw_data.h @@ -64,6 +64,7 @@ struct RawData : public LoadModel { LoadType getRawLoad(ElementIDStruct object, PhaseOffset when) const override; bool hasUserData() const override { return user_data_ != nullptr; } ElmUserDataType getUserData(ElementIDStruct object, PhaseOffset when) const override; + CommMapType getComm(PhaseOffset when) const override; void setLoads(std::unordered_map const* proc_load, std::unordered_map const* proc_comm, diff --git a/src/vt/vrt/collection/balance/node_lb_data.cc b/src/vt/vrt/collection/balance/node_lb_data.cc index 6805f1e933..e6efc89807 100644 --- a/src/vt/vrt/collection/balance/node_lb_data.cc +++ b/src/vt/vrt/collection/balance/node_lb_data.cc @@ -272,6 +272,8 @@ getRecvSendDirection(elm::CommKeyType const& comm) { // this case is just to avoid warning of not handled enum case elm::CommCategory::CollectiveToCollectionBcast: case elm::CommCategory::LocalInvoke: + case elm::CommCategory::WriteShared: + case elm::CommCategory::ReadOnlyShared: return std::make_pair(ElementIDType{}, ElementIDType{}); } diff --git a/src/vt/vrt/collection/balance/temperedlb/criterion.h b/src/vt/vrt/collection/balance/temperedlb/criterion.h index 42e8b7befe..dfbc79f380 100644 --- a/src/vt/vrt/collection/balance/temperedlb/criterion.h +++ b/src/vt/vrt/collection/balance/temperedlb/criterion.h @@ -63,7 +63,7 @@ struct GrapevineCriterion { struct ModifiedGrapevineCriterion { bool operator()(LoadType over, LoadType under, LoadType obj, LoadType) const { - return obj < over - under; + return obj <= over - under; } }; diff --git a/src/vt/vrt/collection/balance/temperedlb/tempered_enums.h b/src/vt/vrt/collection/balance/temperedlb/tempered_enums.h index b56414a333..9313c6241a 100644 --- a/src/vt/vrt/collection/balance/temperedlb/tempered_enums.h +++ b/src/vt/vrt/collection/balance/temperedlb/tempered_enums.h @@ -71,6 +71,36 @@ enum struct InformTypeEnum : uint8_t { AsyncInform = 1 }; +/// Enum for the strategy to be used in transfer stage +enum struct TransferTypeEnum : uint8_t { + /** + * \brief Original strategy + * + * Transfer one object per transfer as in original Grapevine approach. + */ + Original = 0, + /** + * \brief Original strategy improved by recursion + * + * When single object transfer is rejected, attempt to recurse in order to + * pull more objects into the transfer and hereby minimize work added by + * said transfer. + * This is especially useful when communication is taken into account, as + * object transfers typically disrupt local vs. global communication edges. + */ + Recursive = 1, + /** + * \brief Form object clusters and attempt to perform swaps. + * + * Object can be clustered including to arbitrary definition, and swaps + * of entire clusters, including the nullset, between ranks are attempted. + * This is especially useful when shared memory constraints are present, + * as breaking shared memory clusters results in higher overall memory + * footprint, in contrast with whole cluster swaps. + */ + SwapClusters = 2, +}; + /// Enum for the order in which local objects are considered for transfer enum struct ObjectOrderEnum : uint8_t { Arbitrary = 0, //< Arbitrary order: iterate as defined by the unordered_map @@ -122,14 +152,6 @@ enum struct CMFTypeEnum : uint8_t { * target load and the load of the most loaded processor in the CMF. */ NormByMax = 1, - /** - * \brief Compute the CMF factor using the load of this processor - * - * Do not remove processors from the CMF that exceed the target load until the - * next iteration. Use a CMF factor of 1.0/x, where x is the load of the - * processor that is computing the CMF. - */ - NormBySelf = 2, /** * \brief Narrow the CMF to only include processors that can accommodate the * transfer @@ -139,7 +161,7 @@ enum struct CMFTypeEnum : uint8_t { * in the CMF that will pass the chosen Criterion for the object being * considered for transfer. */ - NormByMaxExcludeIneligible = 3, + NormByMaxExcludeIneligible = 2, }; /// Enum for determining fanout and rounds diff --git a/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h b/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h index 5eb188a827..65c5168cd3 100644 --- a/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h +++ b/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h @@ -46,28 +46,129 @@ #include "vt/config.h" -#include +#include INCLUDE_FMT_FORMAT + #include +namespace vt::vrt::collection::lb { + +using BytesType = double; + +struct ClusterInfo { + LoadType load = 0; + BytesType bytes = 0; + double intra_send_vol = 0, intra_recv_vol = 0; + std::unordered_map inter_send_vol, inter_recv_vol; + NodeType home_node = uninitialized_destination; + BytesType edge_weight = 0; + BytesType max_object_working_bytes = 0; + BytesType max_object_working_bytes_outside = 0; + BytesType max_object_serialized_bytes = 0; + BytesType max_object_serialized_bytes_outside = 0; + BytesType cluster_footprint = 0; + + template + void serialize(SerializerT& s) { + s | load | bytes | intra_send_vol | intra_recv_vol; + s | inter_send_vol | inter_recv_vol; + s | home_node | edge_weight; + s | max_object_working_bytes; + s | max_object_working_bytes_outside; + s | max_object_serialized_bytes; + s | max_object_serialized_bytes_outside; + s | cluster_footprint; + } +}; + +struct NodeInfo { + LoadType load = 0; + LoadType work = 0; + double inter_send_vol = 0, inter_recv_vol = 0; + double intra_send_vol = 0, intra_recv_vol = 0; + double shared_vol = 0; + + template + void serialize(SerializerT& s) { + s | load | work; + s | inter_send_vol | inter_recv_vol; + s | intra_send_vol | intra_recv_vol; + s | shared_vol; + } +}; + +using ClusterSummaryType = std::unordered_map; +using RankSummaryType = std::tuple; + +} /* end namespace vt::vrt::collection::lb */ + +VT_FMT_NAMESPACE_BEGIN + +/// Custom fmt formatter/print for \c vt::vrt::collection::lb::ClusterInfo +template <> +struct formatter<::vt::vrt::collection::lb::ClusterInfo> { + /// Parses format specifications of the form ['x' | 'd' | 'b']. + auto parse(format_parse_context& ctx) -> decltype(ctx.begin()) { + // Parse the presentation format and store it in the formatter: + auto it = ctx.begin(), end = ctx.end(); + + // Check if reached the end of the range: + if (it != end && *it != '}') { + throw format_error("invalid format"); + } + + // Return an iterator past the end of the parsed range: + return it; + } + + /// Formats the epoch using the parsed format specification (presentation) + /// stored in this formatter. + template + auto format( + ::vt::vrt::collection::lb::ClusterInfo const& e, FormatContext& ctx + ) { + auto fmt_str = "(load={},bytes={},intra=({},{})),home={},edge={}"; + return format_to( + ctx.out(), fmt_str, e.load, e.bytes, e.intra_send_vol, e.intra_recv_vol, + e.home_node, e.edge_weight + ); + } +}; + +VT_FMT_NAMESPACE_END + namespace vt { namespace vrt { namespace collection { namespace balance { struct LoadMsg : vt::Message { using MessageParentType = vt::Message; vt_msg_serialize_required(); // node_load_ - using NodeLoadType = std::unordered_map; + using NodeClusterSummaryType = + std::unordered_map; + using NodeInfoType = std::unordered_map; LoadMsg() = default; - LoadMsg(NodeType in_from_node, NodeLoadType const& in_node_load) - : from_node_(in_from_node), node_load_(in_node_load) + LoadMsg(NodeType in_from_node, NodeInfoType const& in_node_info) + : from_node_(in_from_node), node_info_(in_node_info) { } - NodeLoadType const& getNodeLoad() const { - return node_load_; + NodeInfoType const& getNodeInfo() const { + return node_info_; + } + + NodeClusterSummaryType const& getNodeClusterSummary() const { + return node_cluster_summary_; + } + + void addNodeInfo(NodeType node, lb::NodeInfo info) { + node_info_[node] = info; } - void addNodeLoad(NodeType node, LoadType load) { - node_load_[node] = load; + void addNodeClusters( + NodeType node, + lb::BytesType rank_working_bytes, + lb::ClusterSummaryType summary + ) { + node_cluster_summary_[node] = std::make_tuple(rank_working_bytes, summary); } NodeType getFromNode() const { return from_node_; } @@ -76,12 +177,14 @@ struct LoadMsg : vt::Message { void serialize(SerializerT& s) { MessageParentType::serialize(s); s | from_node_; - s | node_load_; + s | node_info_; + s | node_cluster_summary_; } private: NodeType from_node_ = uninitialized_destination; - NodeLoadType node_load_ = {}; + NodeInfoType node_info_ = {}; + NodeClusterSummaryType node_cluster_summary_ = {}; }; struct LoadMsgAsync : LoadMsg { @@ -90,9 +193,9 @@ struct LoadMsgAsync : LoadMsg { LoadMsgAsync() = default; LoadMsgAsync( - NodeType in_from_node, NodeLoadType const& in_node_load, int round + NodeType in_from_node, NodeInfoType const& in_node_info, int round ) - : LoadMsg(in_from_node, in_node_load), round_(round) + : LoadMsg(in_from_node, in_node_info), round_(round) { } uint8_t getRound() const { diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc index f73a54d01f..b0d73651ce 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc @@ -57,7 +57,7 @@ #include #include #include -#include +#include namespace vt { namespace vrt { namespace collection { namespace lb { @@ -166,12 +166,34 @@ Default: AsyncInform loads when the first message for a round is received, avoiding the synchronization cost but delaying the propagation of some information until the following round. +)" + }, + { + "transfer", + R"( +Values: {Original, Recursive, SwapClusters} +Default: Original +Description: + Transfer strategy to be used in transfer stage. Options are: + Original: transfer one object per transfer as in original Grapevine approach. + Recursive: original strategy improved by recursion. + When single object transfer is rejected, attempt to recurse in order to + pull more objects into the transfer and hereby minimize work added by + said transfer. + This is especially useful when communication is taken into account, as + object transfers typically disrupt local vs. global communication edges. + SwapClusters: form object clusters and attempt to perform swaps. + Object can be clustered according to arbitrary definition, and swaps + of entire clusters, according the nullset, between ranks are attempted. + This is especially useful when shared memory constraints are present, + as breaking shared memory clusters results in higher overall memory + footprint, in contrast with whole cluster swaps. )" }, { "ordering", R"( -Values: {Arbitrary, ElmID, FewestMigrations, SmallObject, LargestObjects} +Values: {Arbitrary, ElmID, FewestMigrations, SmallObjects, LargestObjects} Default: FewestMigrations Description: The order in which local objects are considered for transfer. Options are: @@ -193,7 +215,7 @@ Default: FewestMigrations { "cmf", R"( -Values: {Original, NormByMax, NormBySelf, NormByMaxExcludeIneligible} +Values: {Original, NormByMax, NormByMaxExcludeIneligible} Default: NormByMax Description: Approach for computing the CMF used to pick an object to transfer. Options @@ -207,10 +229,6 @@ Default: NormByMax until the next iteration. Use a CMF factor of 1.0/x, where x is the greater of the target load and the load of the most loaded processor in the CMF. - NormBySelf: compute the CMF factor using the load of this processor. Do not - remove processors from the CMF that exceed the target load until the next - iteration. Use a CMF factor of 1.0/x, where x is the load of the processor - that is computing the CMF. NormByMaxExcludeIneligible: narrow the CMF to only include processors that can accommodate the transfer. Use a CMF factor of 1.0/x, where x is the greater of the target load and the load of the most loaded processor in @@ -239,6 +257,7 @@ Default: true Description: If the final iteration of a trial has a worse imbalance than any earlier iteration, it will roll back to the iteration with the best imbalance. + If transfer_strategy is SwapClusters, rollback is automatically set to false. )" }, { @@ -252,6 +271,55 @@ Default: false instead of the processor-average load. )" }, + { + "memory_threshold", + R"( +Values: +Defaut: 0 +Description: The memory threshold TemperedLB should strictly stay under which is +respected if memory information is present in the user-defined data. +)" + }, + { + "alpha", + R"( +Values: +Defaut: 1.0 +Description: α in the work model (load in work model) +)" + }, + { + "beta", + R"( +Values: +Defaut: 0.0 +Description: β in the work model (inter-node communication in work model) +)" + }, + { + "gamma", + R"( +Values: +Defaut: 0.0 +Description: γ in the work model (intra-node communication in work model) +)" + }, + { + "delta", + R"( +Values: +Defaut: 0.0 +Description: δ in the work model (shared-memory-edges in work model) +)" + }, + { + "epsilon", + R"( +Values: +Defaut: infinity +Description: ε in the work model (memory term in work model) +)" + } }; return keys_help; } @@ -350,12 +418,19 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { vtAbort(s); } + alpha = config->getOrDefault("alpha", alpha); + beta = config->getOrDefault("beta", beta); + gamma = config->getOrDefault("gamma", gamma); + delta = config->getOrDefault("delta", delta); + epsilon = config->getOrDefault("epsilon", epsilon); + num_iters_ = config->getOrDefault("iters", num_iters_); num_trials_ = config->getOrDefault("trials", num_trials_); deterministic_ = config->getOrDefault("deterministic", deterministic_); rollback_ = config->getOrDefault("rollback", rollback_); target_pole_ = config->getOrDefault("targetpole", target_pole_); + mem_thresh_ = config->getOrDefault("memory_threshold", mem_thresh_); balance::LBArgsEnumConverter criterion_converter_( "criterion", "CriterionEnum", { @@ -373,6 +448,19 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { ); inform_type_ = inform_type_converter_.getFromConfig(config, inform_type_); + balance::LBArgsEnumConverter transfer_type_converter_( + "transfer", "TransferTypeEnum", { + {TransferTypeEnum::Original, "Original"}, + {TransferTypeEnum::Recursive, "Recursive"}, + {TransferTypeEnum::SwapClusters, "SwapClusters"} + } + ); + transfer_type_ = transfer_type_converter_.getFromConfig(config, transfer_type_); + + if (transfer_type_ == TransferTypeEnum::SwapClusters) { + rollback_ = false; + } + balance::LBArgsEnumConverter obj_ordering_converter_( "ordering", "ObjectOrderEnum", { {ObjectOrderEnum::Arbitrary, "Arbitrary"}, @@ -388,7 +476,6 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { "cmf", "CMFTypeEnum", { {CMFTypeEnum::Original, "Original"}, {CMFTypeEnum::NormByMax, "NormByMax"}, - {CMFTypeEnum::NormBySelf, "NormBySelf"}, {CMFTypeEnum::NormByMaxExcludeIneligible, "NormByMaxExcludeIneligible"} } ); @@ -410,10 +497,11 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { terse, temperedlb, "TemperedLB::inputParams: using knowledge={}, fanout={}, rounds={}, " "iters={}, criterion={}, trials={}, deterministic={}, inform={}, " - "ordering={}, cmf={}, rollback={}, targetpole={}\n", + "transfer={}, ordering={}, cmf={}, rollback={}, targetpole={}\n", knowledge_converter_.getString(knowledge_), f_, k_max_, num_iters_, criterion_converter_.getString(criterion_), num_trials_, deterministic_, inform_type_converter_.getString(inform_type_), + transfer_type_converter_.getString(transfer_type_), obj_ordering_converter_.getString(obj_ordering_), cmf_type_converter_.getString(cmf_type_), rollback_, target_pole_ ); @@ -423,9 +511,9 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { void TemperedLB::runLB(LoadType total_load) { bool should_lb = false; + // Compute load statistics this_load = total_load; stats = *getStats(); - auto const avg = stats.at(lb::Statistic::Rank_load_modeled).at( lb::StatisticQuantity::avg ); @@ -453,6 +541,7 @@ void TemperedLB::runLB(LoadType total_load) { should_lb = max > (run_temperedlb_tolerance + 1.0) * target_max_load_; } + // Report statistics from head rank if (theContext()->getNode() == 0) { vt_debug_print( terse, temperedlb, @@ -469,9 +558,483 @@ void TemperedLB::runLB(LoadType total_load) { } } + // Perform load rebalancing when deemed necessary if (should_lb) { - doLBStages(imb); +#if vt_check_enabled(trace_enabled) + theTrace()->disableTracing(); +#endif + + runInEpochCollective("doLBStages", [&,this]{ + auto this_node = theContext()->getNode(); + proxy_[this_node].template send<&TemperedLB::doLBStages>(imb); + }); + +#if vt_check_enabled(trace_enabled) + theTrace()->enableTracing(); +#endif + } +} + +void TemperedLB::readClustersMemoryData() { + if (load_model_->hasUserData()) { + for (auto obj : *load_model_) { + if (obj.isMigratable()) { + auto data_map = load_model_->getUserData( + obj, {balance::PhaseOffset::NEXT_PHASE, balance::PhaseOffset::WHOLE_PHASE} + ); + + SharedIDType shared_id = vt::no_shared_id; + vt::NodeType home_rank = vt::uninitialized_destination; + BytesType shared_bytes = 0; + BytesType working_bytes = 0; + BytesType footprint_bytes = 0; + BytesType serialized_bytes = 0; + for (auto const& [key, variant] : data_map) { + auto val = std::get_if(&variant); + vtAbortIf(!val, '"' + key + "\" in variant does not match double"); + + if (key == "shared_id") { + // Because of how JSON is stored this is always a double, even + // though it should be an integer + shared_id = static_cast(*val); + } + if (key == "home_rank") { + // Because of how JSON is stored this is always a double, even + // though it should be an integer + home_rank = static_cast(*val); + } + if (key == "shared_bytes") { + shared_bytes = *val; + } + if (key == "task_working_bytes") { + working_bytes = *val; + } + if (key == "task_footprint_bytes") { + footprint_bytes = *val; + } + if (key == "task_serialized_bytes") { + serialized_bytes = *val; + } + if (key == "rank_working_bytes") { + rank_bytes_ = *val; + } + } + + vt_debug_print( + verbose, temperedlb, + "obj={} sid={} bytes={} footprint={} serialized={}, working={}\n", + obj, shared_id, shared_bytes, footprint_bytes, serialized_bytes, + working_bytes + ); + + obj_shared_block_[obj] = shared_id; + obj_working_bytes_[obj] = working_bytes; + obj_footprint_bytes_[obj] = footprint_bytes; + obj_serialized_bytes_[obj] = serialized_bytes; + shared_block_size_[shared_id] = shared_bytes; + shared_block_edge_[shared_id] = std::make_tuple(home_rank, shared_bytes); + } + } + } +} + +void TemperedLB::computeClusterSummary() { + cur_clusters_.clear(); + + auto const this_node = theContext()->getNode(); + + for (auto const& [shared_id, shared_bytes] : shared_block_size_) { + auto const& [home_node, shared_volume] = shared_block_edge_[shared_id]; + + ClusterInfo info; + info.bytes = shared_bytes; + info.home_node = home_node; + info.edge_weight = shared_volume; + + std::set cluster_objs; + BytesType max_object_working_bytes = 0; + BytesType max_object_working_bytes_outside = 0; + BytesType max_object_serialized_bytes = 0; + BytesType max_object_serialized_bytes_outside = 0; + BytesType cluster_footprint = 0; + + for (auto const& [obj_id, obj_load] : cur_objs_) { + if (auto iter = obj_shared_block_.find(obj_id); iter != obj_shared_block_.end()) { + if (iter->second == shared_id) { + cluster_objs.insert(obj_id); + info.load += obj_load; + if ( + auto it = obj_working_bytes_.find(obj_id); + it != obj_working_bytes_.end() + ) { + max_object_working_bytes = std::max( + max_object_working_bytes, it->second + ); + } + if ( + auto it = obj_serialized_bytes_.find(obj_id); + it != obj_serialized_bytes_.end() + ) { + max_object_serialized_bytes = std::max( + max_object_serialized_bytes, it->second + ); + } + if ( + auto it = obj_footprint_bytes_.find(obj_id); + it != obj_footprint_bytes_.end() + ) { + cluster_footprint += it->second; + } + } else { + if ( + auto it = obj_working_bytes_.find(obj_id); + it != obj_working_bytes_.end() + ) { + max_object_working_bytes_outside = std::max( + max_object_working_bytes_outside, it->second + ); + } + if ( + auto it = obj_serialized_bytes_.find(obj_id); + it != obj_serialized_bytes_.end() + ) { + max_object_serialized_bytes_outside = std::max( + max_object_serialized_bytes_outside, it->second + ); + } + } + } + } + + info.cluster_footprint = cluster_footprint; + info.max_object_working_bytes = max_object_working_bytes; + info.max_object_working_bytes_outside = max_object_working_bytes_outside; + info.max_object_serialized_bytes = max_object_serialized_bytes; + info.max_object_serialized_bytes_outside = max_object_serialized_bytes_outside; + + if (info.load != 0) { + for (auto&& obj : cluster_objs) { + if (auto it = send_edges_.find(obj); it != send_edges_.end()) { + for (auto const& [target, volume] : it->second) { + vt_debug_print( + verbose, temperedlb, + "computeClusterSummary: send obj={}, target={}\n", + obj, target + ); + + if (cluster_objs.find(target) != cluster_objs.end()) { + // intra-cluster edge + info.intra_send_vol += volume; + } else if ( + cur_objs_.find(target) != cur_objs_.end() or + target.isLocatedOnThisNode() + ) { + // intra-rank edge + info.inter_send_vol[this_node] += volume; + } else { + // inter-rank edge + info.inter_send_vol[target.getCurrNode()] += volume; + } + } + } + if (auto it = recv_edges_.find(obj); it != recv_edges_.end()) { + for (auto const& [target, volume] : it->second) { + vt_debug_print( + verbose, temperedlb, + "computeClusterSummary: recv obj={}, target={}\n", + obj, target + ); + if (cluster_objs.find(target) != cluster_objs.end()) { + // intra-cluster edge + info.intra_recv_vol += volume; + } else if ( + cur_objs_.find(target) != cur_objs_.end() or + target.isLocatedOnThisNode() + ) { + // intra-rank edge + info.inter_recv_vol[this_node] += volume; + } else { + // inter-rank edge + info.inter_recv_vol[target.getCurrNode()] += volume; + } + } + } + } + + cur_clusters_.emplace(shared_id, std::move(info)); + } + } +} + +BytesType TemperedLB::computeMemoryUsage() { + // Compute bytes used by shared blocks mapped here based on object mapping + auto const blocks_here = getSharedBlocksHere(); + + double total_shared_bytes = 0; + for (auto const& block_id : blocks_here) { + total_shared_bytes += shared_block_size_.find(block_id)->second; } + + // Compute max object working and serialized bytes + for (auto const& [obj_id, _] : cur_objs_) { + if ( + auto it = obj_serialized_bytes_.find(obj_id); + it != obj_serialized_bytes_.end() + ) { + max_object_serialized_bytes_ = + std::max(max_object_serialized_bytes_, it->second); + } + if ( + auto it = obj_working_bytes_.find(obj_id); + it != obj_working_bytes_.end() + ) { + max_object_working_bytes_ = + std::max(max_object_working_bytes_, it->second); + } else { + vt_debug_print( + verbose, temperedlb, + "Warning: working bytes not found for object: {}\n", obj_id + ); + } + } + + // Sum up all footprint bytes + double object_footprint_bytes = 0; + for (auto const& [obj_id, _] : cur_objs_) { + if ( + auto it = obj_footprint_bytes_.find(obj_id); + it != obj_footprint_bytes_.end() + ) { + object_footprint_bytes += it->second; + } + } + + return current_memory_usage_ = + rank_bytes_ + + total_shared_bytes + + max_object_working_bytes_ + + object_footprint_bytes + + max_object_serialized_bytes_; +} + +std::set TemperedLB::getSharedBlocksHere() const { + std::set blocks_here; + for (auto const& [obj, _] : cur_objs_) { + if (obj_shared_block_.find(obj) != obj_shared_block_.end()) { + blocks_here.insert(obj_shared_block_.find(obj)->second); + } + } + return blocks_here; +} + +int TemperedLB::getRemoteBlockCountHere() const { + auto this_node = theContext()->getNode(); + auto const& shared_blocks_here = getSharedBlocksHere(); + int remote_block_count = 0; + for (auto const& sid : shared_blocks_here) { + if (auto it = shared_block_edge_.find(sid); it != shared_block_edge_.end()) { + auto const& [home_node, volume] = it->second; + if (home_node != this_node) { + remote_block_count++; + } + } else { + vtAbort("Could not find shared edge volume!"); + } + } + return remote_block_count; +} + +void TemperedLB::workStatsHandler(std::vector const& vec) { + auto const& work = vec[1]; + work_mean_ = work.avg(); + work_max_ = work.max(); + new_work_imbalance_ = work.I(); +} + +double TemperedLB::computeWork( + double load, double inter_comm_bytes, double intra_comm_bytes, + double shared_comm_bytes +) const { + // The work model based on input parameters (excluding epsilon) + return + alpha * load + + beta * inter_comm_bytes + + gamma * intra_comm_bytes + + delta * shared_comm_bytes; +} + +WorkBreakdown TemperedLB::computeWorkBreakdown( + NodeType node, + std::unordered_map const& objs, + std::set const& exclude, + std::unordered_map const& include +) { + double load = 0; + + // Communication bytes sent/recv'ed within the rank + double intra_rank_bytes_sent = 0, intra_rank_bytes_recv = 0; + // Communication bytes sent/recv'ed off rank + double inter_rank_bytes_sent = 0, inter_rank_bytes_recv = 0; + + auto computeEdgeVolumesAndLoad = [&](ObjIDType obj, LoadType obj_load) { + if (exclude.find(obj) == exclude.end()) { + if (auto it = send_edges_.find(obj); it != send_edges_.end()) { + for (auto const& [target, volume] : it->second) { + vt_debug_print( + verbose, temperedlb, + "computeWorkBreakdown: send obj={}, target={}\n", + obj, target + ); + if ( + cur_objs_.find(target) != cur_objs_.end() or + target.isLocatedOnThisNode() + ) { + intra_rank_bytes_sent += volume; + } else { + inter_rank_bytes_sent += volume; + } + } + } + if (auto it = recv_edges_.find(obj); it != recv_edges_.end()) { + for (auto const& [target, volume] : it->second) { + vt_debug_print( + verbose, temperedlb, + "computeWorkBreakdown: recv obj={}, target={}\n", + obj, target + ); + if ( + cur_objs_.find(target) != cur_objs_.end() or + target.isLocatedOnThisNode() + ) { + intra_rank_bytes_recv += volume; + } else { + inter_rank_bytes_recv += volume; + } + } + } + } + + load += obj_load; + }; + + for (auto const& [obj, obj_load] : objs) { + computeEdgeVolumesAndLoad(obj, obj_load); + } + + for (auto const& [obj, obj_load] : include) { + computeEdgeVolumesAndLoad(obj, obj_load); + } + + double shared_volume = 0; + auto const& shared_blocks_here = getSharedBlocksHere(); + + for (auto const& sid : shared_blocks_here) { + if (auto it = shared_block_edge_.find(sid); it != shared_block_edge_.end()) { + auto const& [home_node, volume] = it->second; + if (home_node != node) { + shared_volume += volume; + } + } else { + vtAbort("Could not find shared edge volume!"); + } + } + + auto const inter_vol = std::max(inter_rank_bytes_sent, inter_rank_bytes_recv); + auto const intra_vol = std::max(intra_rank_bytes_sent, intra_rank_bytes_recv); + + WorkBreakdown w; + w.work = computeWork(load, inter_vol, intra_vol, shared_volume); + w.intra_send_vol = intra_rank_bytes_sent; + w.intra_recv_vol = intra_rank_bytes_recv; + w.inter_send_vol = inter_rank_bytes_sent; + w.inter_recv_vol = inter_rank_bytes_recv; + w.shared_vol = shared_volume; + + vt_debug_print( + normal, temperedlb, + "computeWorkBreakdown: load={}, intra sent={}, recv={}," + " inter sent={}, recv={}, shared_vol={}, work={}\n", + load, + intra_rank_bytes_sent, intra_rank_bytes_recv, + inter_rank_bytes_sent, inter_rank_bytes_recv, + shared_volume, w.work + ); + + return w; +} + +double TemperedLB::computeWorkAfterClusterSwap( + NodeType node, NodeInfo const& info, ClusterInfo const& to_remove, + ClusterInfo const& to_add +) { + // Start with the existing work for the node and work backwards to compute the + // new work with the cluster removed + double node_work = info.work; + + // Remove/add clusters' load factor from work model + node_work -= alpha * to_remove.load; + node_work += alpha * to_add.load; + + // Remove/add clusters' intra-comm + double const node_intra_send = info.intra_send_vol; + double const node_intra_recv = info.intra_recv_vol; + node_work -= gamma * std::max(node_intra_send, node_intra_recv); + node_work += gamma * std::max( + node_intra_send - to_remove.intra_send_vol + to_add.intra_send_vol, + node_intra_recv - to_remove.intra_recv_vol + to_add.intra_recv_vol + ); + + // Uninitialized destination means that the cluster is empty + // If to_remove was remote, remove that component from the work + if ( + to_remove.home_node != node and + to_remove.home_node != uninitialized_destination + ) { + node_work -= delta * to_remove.edge_weight; + } + + // If to_add is now remote, add that component to the work + if ( + to_add.home_node != node and + to_add.home_node != uninitialized_destination + ) { + node_work += delta * to_add.edge_weight; + } + + // Update formulae for inter-node communication + double node_inter_send = info.inter_send_vol; + double node_inter_recv = info.inter_recv_vol; + node_work -= beta * std::max(node_inter_send, node_inter_recv); + + // All edges outside the to_remove cluster that are also off the node need to + // be removed from the inter-node volumes + for (auto const& [target, volume] : to_remove.inter_send_vol) { + if (target != node) { + node_inter_send -= volume; + } + } + for (auto const& [target, volume] : to_remove.inter_recv_vol) { + if (target != node) { + node_inter_recv -= volume; + } + } + + // All edges outside the to_add cluster that are now off the node need to + // be added from the inter-node volumes + for (auto const& [target, volume] : to_add.inter_send_vol) { + if (target != node) { + node_inter_send += volume; + } + } + for (auto const& [target, volume] : to_add.inter_recv_vol) { + if (target != node) { + node_inter_recv += volume; + } + } + + node_work += beta * std::max(node_inter_send, node_inter_recv); + + return node_work; } void TemperedLB::doLBStages(LoadType start_imb) { @@ -482,12 +1045,22 @@ void TemperedLB::doLBStages(LoadType start_imb) { auto this_node = theContext()->getNode(); + // Read in memory information if it's available before we do any trials + readClustersMemoryData(); + + if (transfer_type_ == TransferTypeEnum::SwapClusters) { + has_memory_data_ = true; + } + for (trial_ = 0; trial_ < num_trials_; ++trial_) { // Clear out data structures selected_.clear(); underloaded_.clear(); load_info_.clear(); + other_rank_clusters_.clear(); + max_load_over_iters_.clear(); is_overloaded_ = is_underloaded_ = false; + ready_to_satisfy_locks_ = false; LoadType best_imb_this_trial = start_imb + 10; @@ -497,18 +1070,117 @@ void TemperedLB::doLBStages(LoadType start_imb) { if (first_iter) { // Copy this node's object assignments to a local, mutable copy cur_objs_.clear(); + int total_num_objs = 0; + int num_migratable_objs = 0; for (auto obj : *load_model_) { + total_num_objs++; if (obj.isMigratable()) { + num_migratable_objs++; cur_objs_[obj] = getModeledValue(obj); } } + + vt_debug_print( + normal, temperedlb, + "TemperedLB::doLBStages: Found {} migratable objects out of {}.\n", + num_migratable_objs, total_num_objs + ); + + send_edges_.clear(); + recv_edges_.clear(); + bool has_comm = false; + auto const& comm = load_model_->getComm( + {balance::PhaseOffset::NEXT_PHASE, balance::PhaseOffset::WHOLE_PHASE} + ); + // vt_print(temperedlb, "comm size={} {}\n", comm.size(), typeid(load_model_).name()); + + for (auto const& [key, volume] : comm) { + // vt_print(temperedlb, "Found comm: volume={}\n", volume.bytes); + // Skip self edges + if (key.selfEdge()) { + continue; + } + + if (key.commCategory() == elm::CommCategory::SendRecv) { + auto const from_obj = key.fromObj(); + auto const to_obj = key.toObj(); + auto const bytes = volume.bytes; + + send_edges_[from_obj].emplace_back(to_obj, bytes); + recv_edges_[to_obj].emplace_back(from_obj, bytes); + has_comm = true; + } else if (key.commCategory() == elm::CommCategory::WriteShared) { + auto const to_node = key.toNode(); + auto const shared_id = key.sharedID(); + auto const bytes = volume.bytes; + shared_block_edge_[shared_id] = std::make_tuple(to_node, bytes); + has_comm = true; + } else if (key.commCategory() == elm::CommCategory::ReadOnlyShared) { + auto const to_node = key.toNode(); + auto const shared_id = key.sharedID(); + auto const bytes = volume.bytes; + shared_block_edge_[shared_id] = std::make_tuple(to_node, bytes); + has_comm = true; + } + } + + runInEpochCollective("checkIfEdgesExist", [&]{ + proxy_.allreduce<&TemperedLB::hasCommAny, collective::OrOp>(has_comm); + }); + + if (has_comm_any_) { + runInEpochCollective("symmEdges", [&]{ + std::unordered_map edges; + + for (auto const& [from_obj, to_edges] : send_edges_) { + for (auto const& [to_obj, volume] : to_edges) { + vt_debug_print( + verbose, temperedlb, + "SymmEdges: from={}, to={}, volume={}\n", + from_obj, to_obj, volume + ); + auto curr_from_node = from_obj.getCurrNode(); + if (curr_from_node != this_node) { + edges[curr_from_node][from_obj].emplace_back(to_obj, volume); + } + auto curr_to_node = to_obj.getCurrNode(); + if (curr_to_node != this_node) { + edges[curr_to_node][from_obj].emplace_back(to_obj, volume); + } + } + } + + for (auto const& [dest_node, edge_map] : edges) { + proxy_[dest_node].template send<&TemperedLB::giveEdges>(edge_map); + } + }); + } + this_new_load_ = this_load; + this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); + this_work = this_new_work_ = this_new_breakdown_.work; + + runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] { + // Perform the reduction for Rank_load_modeled -> processor load only + proxy_.allreduce<&TemperedLB::workStatsHandler, collective::PlusOp>( + std::vector{ + {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, + {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} + } + ); + }); + } else { // Clear out data structures from previous iteration selected_.clear(); underloaded_.clear(); load_info_.clear(); is_overloaded_ = is_underloaded_ = false; + ready_to_satisfy_locks_ = false; + other_rank_clusters_.clear(); + + // Not clearing shared_block_size_ because this never changes and + // the knowledge might be useful } vt_debug_print( @@ -519,12 +1191,39 @@ void TemperedLB::doLBStages(LoadType start_imb) { LoadType(this_new_load_) ); + if (has_memory_data_) { + double const memory_usage = computeMemoryUsage(); + + vt_debug_print( + terse, temperedlb, + "Current memory info: total memory usage={}, shared blocks here={}, " + "memory_threshold={}\n", memory_usage, + getSharedBlocksHere().size(), mem_thresh_ + ); + + if (memory_usage > mem_thresh_) { + vtAbort("This should never be possible to go over the threshold\n"); + } + + computeClusterSummary(); + + // Verbose printing about local clusters + for (auto const& [shared_id, cluster_info] : cur_clusters_) { + vt_debug_print( + verbose, temperedlb, + "Local cluster: id={}: {}\n", + shared_id, cluster_info + ); + } + } + if (isOverloaded(this_new_load_)) { is_overloaded_ = true; } else if (isUnderloaded(this_new_load_)) { is_underloaded_ = true; } + // Perform requested type of information stage switch (inform_type_) { case InformTypeEnum::SyncInform: informSync(); @@ -536,7 +1235,42 @@ void TemperedLB::doLBStages(LoadType start_imb) { vtAbort("TemperedLB:: Unsupported inform type"); } - decide(); + // Some very verbose printing about all remote clusters we know about that + // we can shut off later + for (auto const& [node, clusters] : other_rank_clusters_) { + for (auto const& [shared_id, cluster_info] : clusters) { + vt_debug_print( + verbose, temperedlb, + "Remote cluster: node={}, id={}, {}\n", + node, shared_id, cluster_info + ); + } + } + + // Move remote cluster information to shared_block_size_ so we have all + // the sizes in the same place + for (auto const& [node, clusters] : other_rank_clusters_) { + for (auto const& [shared_id, cluster_info] : clusters) { + shared_block_size_[shared_id] = cluster_info.bytes; + shared_block_edge_[shared_id] = + std::make_tuple(cluster_info.home_node, cluster_info.edge_weight); + } + } + + // Execute transfer stage + switch (transfer_type_) { + case TransferTypeEnum::Original: + originalTransfer(); + break; + case TransferTypeEnum::Recursive: + vtAbort("TemperedLB:: Unimplemented transfer type: Recursive"); + break; + case TransferTypeEnum::SwapClusters: + swapClusters(); + break; + default: + vtAbort("TemperedLB:: Unsupported transfer type"); + } vt_debug_print( verbose, temperedlb, @@ -546,12 +1280,20 @@ void TemperedLB::doLBStages(LoadType start_imb) { LoadType(this_new_load_) ); - if (rollback_ || theConfig()->vt_debug_temperedlb || (iter_ == num_iters_ - 1)) { + if ( + rollback_ || + theConfig()->vt_debug_temperedlb || + (iter_ == num_iters_ - 1) || + transfer_type_ == TransferTypeEnum::SwapClusters + ) { + this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); + this_new_work_ = this_new_breakdown_.work; runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] { // Perform the reduction for Rank_load_modeled -> processor load only proxy_.allreduce<&TemperedLB::loadStatsHandler, collective::PlusOp>( std::vector{ - {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}} + {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, + {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} } ); }); @@ -581,6 +1323,8 @@ void TemperedLB::doLBStages(LoadType start_imb) { // Clear out for next try or for not migrating by default cur_objs_.clear(); + send_edges_.clear(); + recv_edges_.clear(); this_new_load_ = this_load; } @@ -597,6 +1341,15 @@ void TemperedLB::doLBStages(LoadType start_imb) { best_trial, new_imbalance_ ); } + + // Skip this block when not using SwapClusters + if (transfer_type_ == TransferTypeEnum::SwapClusters) { + auto remote_block_count = getRemoteBlockCountHere(); + runInEpochCollective("TemperedLB::doLBStages -> compute unhomed", [=] { + proxy_.allreduce<&TemperedLB::remoteBlockCountHandler, + collective::PlusOp>(remote_block_count); + }); + } } else if (this_node == 0) { vt_debug_print( terse, temperedlb, @@ -609,27 +1362,54 @@ void TemperedLB::doLBStages(LoadType start_imb) { thunkMigrations(); } +void TemperedLB::giveEdges(EdgeMapType const& edge_map) { + for (auto const& [from_obj, to_edges] : edge_map) { + for (auto const& [to_obj, volume] : to_edges) { + send_edges_[from_obj].emplace_back(to_obj, volume); + recv_edges_[to_obj].emplace_back(from_obj, volume); + } + } +} + +void TemperedLB::hasCommAny(bool has_comm_any) { + has_comm_any_ = has_comm_any; +} + void TemperedLB::loadStatsHandler(std::vector const& vec) { auto const& in = vec[0]; + auto const& work = vec[1]; new_imbalance_ = in.I(); + work_mean_ = work.avg(); + work_max_ = work.max(); + new_work_imbalance_ = work.I(); + + max_load_over_iters_.push_back(in.max()); + auto this_node = theContext()->getNode(); if (this_node == 0) { vt_debug_print( terse, temperedlb, - "TemperedLB::loadStatsHandler: trial={} iter={} max={} min={} " - "avg={} pole={} imb={:0.4f}\n", - trial_, iter_, LoadType(in.max()), + "TemperedLB::loadStatsHandler: trial={} iter={}" + " Load[max={:0.2f} min={:0.2f} avg={:0.2f} pole={:0.2f} imb={:0.4f}] " + " Work[max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.4f}]\n", + trial_, iter_, + LoadType(in.max()), LoadType(in.min()), LoadType(in.avg()), LoadType(stats.at( lb::Statistic::Object_load_modeled ).at(lb::StatisticQuantity::max)), - in.I() + in.I(), + LoadType(work.max()), + LoadType(work.min()), LoadType(work.avg()), + work.I() ); } } -void TemperedLB::rejectionStatsHandler(int n_rejected, int n_transfers) { +void TemperedLB::rejectionStatsHandler( + int n_rejected, int n_transfers, int n_unhomed_blocks +) { double rej = static_cast(n_rejected) / static_cast(n_rejected + n_transfers) * 100.0; @@ -637,9 +1417,21 @@ void TemperedLB::rejectionStatsHandler(int n_rejected, int n_transfers) { if (this_node == 0) { vt_debug_print( terse, temperedlb, - "TemperedLB::rejectionStatsHandler: n_transfers={} n_rejected={} " + "TemperedLB::rejectionStatsHandler: n_transfers={} n_unhomed_blocks={}" + " n_rejected={} " "rejection_rate={:0.1f}%\n", - n_transfers, n_rejected, rej + n_transfers, n_unhomed_blocks, n_rejected, rej + ); + } +} + +void TemperedLB::remoteBlockCountHandler(int n_unhomed_blocks) { + auto this_node = theContext()->getNode(); + if (this_node == 0) { + vt_print( + temperedlb, + "After load balancing, {} blocks will be off their home ranks\n", + n_unhomed_blocks ); } } @@ -782,8 +1574,8 @@ void TemperedLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { selected.insert(this_node); } + // Determine fanout factor capped by number of nodes auto const fanout = std::min(f_, static_cast(num_nodes - 1)); - vt_debug_print( verbose, temperedlb, "TemperedLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " @@ -791,6 +1583,7 @@ void TemperedLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { trial_, iter_, k_max_, k_cur, selected.size(), fanout ); + // Iterate over fanout factor for (int i = 0; i < fanout; i++) { // This implies full knowledge of all processors if (selected.size() >= static_cast(num_nodes)) { @@ -817,20 +1610,40 @@ void TemperedLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { // Send message with load if (sync) { + // Message in synchronous mode auto msg = makeMessage(this_node, load_info_); if (epoch != no_epoch) { envelopeSetEpoch(msg->env, epoch); } - msg->addNodeLoad(this_node, this_new_load_); + NodeInfo info{ + this_new_load_, this_new_work_, + this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, + this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, + this_new_breakdown_.shared_vol + }; + msg->addNodeInfo(this_node, info); + if (has_memory_data_) { + msg->addNodeClusters(this_node, rank_bytes_, cur_clusters_); + } proxy_[random_node].sendMsg< LoadMsgSync, &TemperedLB::propagateIncomingSync >(msg.get()); } else { + // Message in asynchronous mode auto msg = makeMessage(this_node, load_info_, k_cur); if (epoch != no_epoch) { envelopeSetEpoch(msg->env, epoch); } - msg->addNodeLoad(this_node, this_new_load_); + NodeInfo info{ + this_new_load_, this_new_work_, + this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, + this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, + this_new_breakdown_.shared_vol + }; + msg->addNodeInfo(this_node, info); + if (has_memory_data_) { + msg->addNodeClusters(this_node, rank_bytes_, cur_clusters_); + } proxy_[random_node].sendMsg< LoadMsgAsync, &TemperedLB::propagateIncomingAsync >(msg.get()); @@ -846,14 +1659,26 @@ void TemperedLB::propagateIncomingAsync(LoadMsgAsync* msg) { normal, temperedlb, "TemperedLB::propagateIncomingAsync: trial={}, iter={}, k_max={}, " "k_cur={}, from_node={}, load info size={}\n", - trial_, iter_, k_max_, k_cur_async, from_node, msg->getNodeLoad().size() + trial_, iter_, k_max_, k_cur_async, from_node, msg->getNodeInfo().size() ); - for (auto&& elm : msg->getNodeLoad()) { + auto const this_node = theContext()->getNode(); + for (auto const& [node, rank_summary] : msg->getNodeClusterSummary()) { + if ( + node != this_node and + other_rank_clusters_.find(node) == other_rank_clusters_.end() + ) { + auto const& [rank_working_bytes, clusters] = rank_summary; + other_rank_clusters_[node] = clusters; + other_rank_working_bytes_[node] = rank_working_bytes; + } + } + + for (auto&& elm : msg->getNodeInfo()) { if (load_info_.find(elm.first) == load_info_.end()) { load_info_[elm.first] = elm.second; - if (isUnderloaded(elm.second)) { + if (isUnderloaded(elm.second.load)) { underloaded_.insert(elm.first); } } @@ -880,14 +1705,26 @@ void TemperedLB::propagateIncomingSync(LoadMsgSync* msg) { normal, temperedlb, "TemperedLB::propagateIncomingSync: trial={}, iter={}, k_max={}, " "k_cur={}, from_node={}, load info size={}\n", - trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeLoad().size() + trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeInfo().size() ); - for (auto&& elm : msg->getNodeLoad()) { + auto const this_node = theContext()->getNode(); + for (auto const& [node, rank_summary] : msg->getNodeClusterSummary()) { + if ( + node != this_node and + other_rank_clusters_.find(node) == other_rank_clusters_.end() + ) { + auto const& [rank_working_bytes, clusters] = rank_summary; + other_rank_clusters_[node] = clusters; + other_rank_working_bytes_[node] = rank_working_bytes; + } + } + + for (auto&& elm : msg->getNodeInfo()) { if (new_load_info_.find(elm.first) == new_load_info_.end()) { new_load_info_[elm.first] = elm.second; - if (isUnderloaded(elm.second)) { + if (isUnderloaded(elm.second.load)) { new_underloaded_.insert(elm.first); } } @@ -912,9 +1749,6 @@ std::vector TemperedLB::createCMF(NodeSetType const& under) { case CMFTypeEnum::Original: factor = 1.0 / target_max_load_; break; - case CMFTypeEnum::NormBySelf: - factor = 1.0 / this_new_load_; - break; case CMFTypeEnum::NormByMax: case CMFTypeEnum::NormByMaxExcludeIneligible: { @@ -922,7 +1756,7 @@ std::vector TemperedLB::createCMF(NodeSetType const& under) { for (auto&& pe : under) { auto iter = load_info_.find(pe); vtAssert(iter != load_info_.end(), "Node must be in load_info_"); - auto load = iter->second; + auto load = iter->second.load; if (load > l_max) { l_max = load; } @@ -938,7 +1772,7 @@ std::vector TemperedLB::createCMF(NodeSetType const& under) { auto iter = load_info_.find(pe); vtAssert(iter != load_info_.end(), "Node must be in load_info_"); - auto load = iter->second; + auto load = iter->second.load; sum_p += 1. - factor * load; cmf.push_back(sum_p); } @@ -982,7 +1816,7 @@ NodeType TemperedLB::sampleFromCMF( std::vector TemperedLB::makeUnderloaded() const { std::vector under = {}; for (auto&& elm : load_info_) { - if (isUnderloaded(elm.second)) { + if (isUnderloaded(elm.second.load)) { under.push_back(elm.first); } } @@ -998,7 +1832,7 @@ std::vector TemperedLB::makeSufficientlyUnderloaded( std::vector sufficiently_under = {}; for (auto&& elm : load_info_) { bool eval = Criterion(criterion_)( - this_new_load_, elm.second, load_to_accommodate, target_max_load_ + this_new_load_, elm.second.load, load_to_accommodate, target_max_load_ ); if (eval) { sufficiently_under.push_back(elm.first); @@ -1059,7 +1893,7 @@ std::vector TemperedLB::orderObjects( auto single_obj_load = this_new_load; for (auto &obj : cur_objs) { auto obj_load = obj.second; - if (obj_load > over_avg && obj_load < single_obj_load) { + if (obj_load >= over_avg && obj_load < single_obj_load) { single_obj_load = obj_load; } } @@ -1182,11 +2016,13 @@ std::vector TemperedLB::orderObjects( return ordered_obj_ids; } -void TemperedLB::decide() { - auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: decide"); +void TemperedLB::originalTransfer() { + auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: originalTransfer"); + // Initialize transfer and rejection counters int n_transfers = 0, n_rejected = 0; + // Try to migrate objects only from overloaded ranks if (is_overloaded_) { std::vector under = makeUnderloaded(); std::unordered_map migrate_objs; @@ -1219,31 +2055,32 @@ void TemperedLB::decide() { } // Rebuild the CMF with the new loads taken into account auto cmf = createCMF(under); + // Select a node using the CMF auto const selected_node = sampleFromCMF(under, cmf); vt_debug_print( verbose, temperedlb, - "TemperedLB::decide: selected_node={}, load_info_.size()={}\n", + "TemperedLB::originalTransfer: selected_node={}, load_info_.size()={}\n", selected_node, load_info_.size() ); + // Find load of selected node auto load_iter = load_info_.find(selected_node); vtAssert(load_iter != load_info_.end(), "Selected node not found"); + auto& selected_load = load_iter->second.load; - // The load of the node selected - auto& selected_load = load_iter->second; - + // Check if object is migratable and evaluate criterion for proposed transfer + bool is_migratable = obj_id.isMigratable(); bool eval = Criterion(criterion_)( this_new_load_, selected_load, obj_load, target_max_load_ ); - vt_debug_print( verbose, temperedlb, - "TemperedLB::decide: trial={}, iter={}, under.size()={}, " + "TemperedLB::originalTransfer: trial={}, iter={}, under.size()={}, " "selected_node={}, selected_load={:e}, obj_id={:x}, home={}, " - "obj_load={}, target_max_load={}, this_new_load_={}, " - "criterion={}\n", + "is_migratable()={}, obj_load={}, target_max_load={}, " + "this_new_load_={}, criterion={}\n", trial_, iter_, under.size(), @@ -1251,15 +2088,17 @@ void TemperedLB::decide() { selected_load, obj_id.id, obj_id.getHomeNode(), - LoadType(obj_load), - LoadType(target_max_load_), - LoadType(this_new_load_), + is_migratable, + obj_load, + target_max_load_, + this_new_load_, eval ); - if (eval) { + // Decide about proposed migration based on criterion evaluation + if (is_migratable and eval) { ++n_transfers; - // transfer the object load in seconds + // Transfer the object load in seconds // to match the object load units on the receiving end migrate_objs[selected_node][obj_id] = obj_load; @@ -1284,7 +2123,6 @@ void TemperedLB::decide() { auto node = migration.first; lazyMigrateObjsTo(lazy_epoch, node, migration.second); } - } else { // do nothing (underloaded-based algorithm), waits to get work from // overloaded nodes @@ -1296,9 +2134,574 @@ void TemperedLB::decide() { if (theConfig()->vt_debug_temperedlb) { // compute rejection rate because it will be printed - runInEpochCollective("TemperedLB::decide -> compute rejection", [=] { + runInEpochCollective("TemperedLB::originalTransfer -> compute rejection", [=] { + proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( + n_rejected, n_transfers, 0 + ); + }); + } +} + +void TemperedLB::tryLock(NodeType requesting_node, double criterion_value) { + try_locks_.emplace(requesting_node, criterion_value); + + if (ready_to_satisfy_locks_ and not is_locked_) { + satisfyLockRequest(); + } +} + +auto TemperedLB::removeClusterToSend( + SharedIDType shared_id, std::set objs +) { + std::unordered_map give_objs; + std::unordered_map give_obj_shared_block; + std::unordered_map give_shared_blocks_size; + std::unordered_map give_obj_working_bytes; + + vt_debug_print( + verbose, temperedlb, + "removeClusterToSend: shared_id={}\n", + shared_id + ); + + if (shared_id != no_shared_id) { + give_shared_blocks_size[shared_id] = shared_block_size_[shared_id]; + } + + if (objs.size() == 0) { + for (auto const& [obj_id, obj_load] : cur_objs_) { + if (auto iter = obj_shared_block_.find(obj_id); iter != obj_shared_block_.end()) { + if (iter->second == shared_id) { + give_objs[obj_id] = obj_load; + give_obj_shared_block[obj_id] = shared_id; + if ( + auto iter2 = give_obj_working_bytes.find(obj_id); + iter2 != give_obj_working_bytes.end() + ) { + give_obj_working_bytes[obj_id] = iter2->second; + } + } + } + } + } else { + for (auto const& obj_id : objs) { + give_objs[obj_id] = cur_objs_.find(obj_id)->second; + give_obj_shared_block[obj_id] = shared_id; + if ( + auto iter2 = give_obj_working_bytes.find(obj_id); + iter2 != give_obj_working_bytes.end() + ) { + give_obj_working_bytes[obj_id] = iter2->second; + } + } + } + + auto const blocks_here_before = getSharedBlocksHere(); + + for (auto const& [give_obj_id, give_obj_load] : give_objs) { + auto iter = cur_objs_.find(give_obj_id); + vtAssert(iter != cur_objs_.end(), "Object must exist"); + // remove the object! + cur_objs_.erase(iter); + this_new_load_ -= give_obj_load; + } + + auto const blocks_here_after = getSharedBlocksHere(); + + vt_debug_print( + verbose, temperedlb, + "removeClusterToSend: before count={}, after count={}\n", + blocks_here_before.size(), blocks_here_after.size() + ); + + return std::make_tuple( + give_objs, + give_obj_shared_block, + give_shared_blocks_size, + give_obj_working_bytes + ); +} + +double TemperedLB::loadTransferCriterion( + double before_w_src, double before_w_dst, double after_w_src, + double after_w_dst +) { + // Compute maximum work of original arrangement + auto const w_max_0 = std::max(before_w_src, before_w_dst); + + // Compute maximum work of arrangement after proposed load transfer + auto const w_max_new = std::max(after_w_src, after_w_dst); + + // Return criterion value + return w_max_0 - w_max_new; +} + +void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { + consider_swaps_counter_++; + is_swapping_ = true; + is_locked_ = true; + + vt_debug_print( + verbose, temperedlb, + "considerSwapsAfterLock: consider_swaps_counter_={} start\n", + consider_swaps_counter_ + ); + + auto const this_node = theContext()->getNode(); + + NodeInfo this_info{ + this_new_load_, this_new_work_, + this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, + this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, + this_new_breakdown_.shared_vol + }; + + auto criterion = [&,this]( + auto try_rank, auto const& try_info, auto try_mem, + auto try_max_object_working_bytes, + auto try_max_object_serialized_bytes, + auto const& src_cluster, auto const& try_cluster + ) -> double { + BytesType try_new_mem = try_mem; + try_new_mem -= try_cluster.bytes; + try_new_mem += src_cluster.bytes; + try_new_mem -= try_max_object_working_bytes; + try_new_mem += std::max( + try_cluster.max_object_working_bytes_outside, + src_cluster.max_object_working_bytes + ); + try_new_mem -= try_max_object_serialized_bytes; + try_new_mem += std::max( + try_cluster.max_object_serialized_bytes_outside, + src_cluster.max_object_serialized_bytes + ); + try_new_mem -= try_cluster.cluster_footprint; + try_new_mem += src_cluster.cluster_footprint; + + if (try_new_mem > mem_thresh_) { + return - epsilon; + } + + BytesType src_new_mem = current_memory_usage_; + src_new_mem -= src_cluster.bytes; + src_new_mem += try_cluster.bytes; + src_new_mem -= max_object_working_bytes_; + src_new_mem += std::max( + src_cluster.max_object_working_bytes_outside, + try_cluster.max_object_working_bytes + ); + src_new_mem -= max_object_serialized_bytes_; + src_new_mem += std::max( + src_cluster.max_object_serialized_bytes_outside, + try_cluster.max_object_serialized_bytes + ); + src_new_mem += try_cluster.cluster_footprint; + src_new_mem -= src_cluster.cluster_footprint; + + if (src_new_mem > mem_thresh_) { + return - epsilon; + } + + double const src_new_work = + computeWorkAfterClusterSwap(this_node, this_info, src_cluster, try_cluster); + double const dest_new_work = + computeWorkAfterClusterSwap(try_rank, try_info, try_cluster, src_cluster); + + // Return load transfer criterion + return loadTransferCriterion( + this_new_work_, try_info.work, src_new_work, dest_new_work + ); + }; + + auto const& try_clusters = msg->locked_clusters; + auto const& try_rank = msg->locked_node; + auto const& try_total_bytes = msg->locked_bytes; + auto const& try_max_owm = msg->locked_max_object_working_bytes; + auto const& try_max_osm = msg->locked_max_object_serialized_bytes; + auto const& try_info = msg->locked_info; + + double best_c_try = -1.0; + std::tuple best_swap = + {no_shared_id, no_shared_id}; + for (auto const& [src_shared_id, src_cluster] : cur_clusters_) { + // try swapping with empty cluster first + { + ClusterInfo empty_cluster; + double c_try = criterion( + try_rank, try_info, try_total_bytes, try_max_owm, try_max_osm, + src_cluster, empty_cluster + ); + if (c_try >= 0.0) { + if (c_try > best_c_try) { + best_c_try = c_try; + best_swap = std::make_tuple(src_shared_id, no_shared_id); + } + } + } + + for (auto const& [try_shared_id, try_cluster] : try_clusters) { + double c_try = criterion( + try_rank, try_info, try_total_bytes, try_max_owm, try_max_osm, + src_cluster, try_cluster + ); + vt_debug_print( + verbose, temperedlb, + "testing a possible swap (rank {}): {} {} c_try={}\n", + try_rank, src_shared_id, try_shared_id, c_try + ); + if (c_try >= 0.0) { + if (c_try > best_c_try) { + best_c_try = c_try; + best_swap = std::make_tuple(src_shared_id, try_shared_id); + } + } + } + } + + if (best_c_try >= 0) { + // FIXME C++20: use structured binding + auto const src_shared_id = std::get<0>(best_swap); + auto const try_shared_id = std::get<1>(best_swap); + + vt_debug_print( + normal, temperedlb, + "best_c_try={}, swapping {} for {} on rank ={}\n", + best_c_try, src_shared_id, try_shared_id, try_rank + ); + + // FIXME C++20: use structured binding + auto const& give_data = removeClusterToSend(src_shared_id); + auto const& give_objs = std::get<0>(give_data); + auto const& give_obj_shared_block = std::get<1>(give_data); + auto const& give_shared_blocks_size = std::get<2>(give_data); + auto const& give_obj_working_bytes = std::get<3>(give_data); + + runInEpochRooted("giveCluster", [&]{ + vt_debug_print( + verbose, temperedlb, + "considerSwapsAfterLock: giveCluster swapping {} for {}, epoch={:x}\n", + src_shared_id, try_shared_id, theMsg()->getEpoch() + ); + proxy_[try_rank].template send<&TemperedLB::giveCluster>( + this_node, + give_shared_blocks_size, + give_objs, + give_obj_shared_block, + give_obj_working_bytes, + try_shared_id + ); + }); + + computeClusterSummary(); + this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); + this_new_work_ = this_new_breakdown_.work; + computeMemoryUsage(); + + vt_debug_print( + normal, temperedlb, + "best_c_try={}, swap completed with rank={}\n", + best_c_try, try_rank + ); + } + + proxy_[try_rank].template send<&TemperedLB::releaseLock>(); + + vt_debug_print( + verbose, temperedlb, + "considerSwapsAfterLock: consider_swaps_counter_={} finish\n", + consider_swaps_counter_ + ); + + is_swapping_ = false; + is_locked_ = false; + consider_swaps_counter_--; + + if (pending_actions_.size() > 0) { + auto action = pending_actions_.back(); + pending_actions_.pop_back(); + action(); + } +} + +void TemperedLB::giveCluster( + NodeType from_rank, + std::unordered_map const& give_shared_blocks_size, + std::unordered_map const& give_objs, + std::unordered_map const& give_obj_shared_block, + std::unordered_map const& give_obj_working_bytes, + SharedIDType take_cluster +) { + auto const this_node = theContext()->getNode(); + + n_transfers_swap_++; + + vtAssert(give_shared_blocks_size.size() == 1, "Must be one block right now"); + + for (auto const& [obj_id, obj_load] : give_objs) { + this_new_load_ += obj_load; + cur_objs_[obj_id] = obj_load; + } + for (auto const& [id, bytes] : give_shared_blocks_size) { + shared_block_size_[id] = bytes; + } + for (auto const& [obj_id, id] : give_obj_shared_block) { + obj_shared_block_[obj_id] = id; + } + for (auto const& elm : give_obj_working_bytes) { + obj_working_bytes_.emplace(elm); + } + + if (take_cluster != no_shared_id) { + auto const& [ + take_objs, + take_obj_shared_block, + take_shared_blocks_size, + take_obj_working_bytes + ] = removeClusterToSend(take_cluster); + + proxy_[from_rank].template send<&TemperedLB::giveCluster>( + this_node, + take_shared_blocks_size, + take_objs, + take_obj_shared_block, + take_obj_working_bytes, + no_shared_id + ); + } + + computeClusterSummary(); + this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); + this_new_work_ = this_new_breakdown_.work; + computeMemoryUsage(); + + vt_debug_print( + normal, temperedlb, + "giveCluster: from_rank={}, epoch={:x} total memory usage={}, shared blocks here={}, " + "memory_threshold={}, give_cluster={}, take_cluster={}\n", + from_rank, theMsg()->getEpoch(), + computeMemoryUsage(), + getSharedBlocksHere().size(), mem_thresh_, + give_shared_blocks_size.begin()->first, take_cluster + ); +} + +void TemperedLB::releaseLock() { + vt_debug_print( + normal, temperedlb, + "releaseLock: pending size={}\n", + pending_actions_.size() + ); + + is_locked_ = false; + locking_rank_ = uninitialized_destination; + + if (pending_actions_.size() > 0) { + auto action = pending_actions_.back(); + pending_actions_.pop_back(); + action(); + } else { + // satisfy another lock + satisfyLockRequest(); + } +} + +void TemperedLB::lockObtained(LockedInfoMsg* in_msg) { + auto msg = promoteMsg(in_msg); + + vt_debug_print( + normal, temperedlb, + "lockObtained: is_locked_={}, is_swapping_={}\n", + is_locked_, is_swapping_ + ); + + auto cur_epoch = theMsg()->getEpoch(); + theTerm()->produce(cur_epoch); + + auto action = [this, msg, cur_epoch]{ + theMsg()->pushEpoch(cur_epoch); + considerSwapsAfterLock(msg); + theMsg()->popEpoch(cur_epoch); + theTerm()->consume(cur_epoch); + }; + + if (is_locked_ && locking_rank_ <= msg->locked_node) { + proxy_[msg->locked_node].template send<&TemperedLB::releaseLock>(); + theTerm()->consume(cur_epoch); + try_locks_.emplace(msg->locked_node, msg->locked_c_try); + //pending_actions_.push_back(action); + } else if (is_locked_) { + pending_actions_.push_back(action); + } else if (is_swapping_) { + pending_actions_.push_back(action); + } else { + vt_debug_print( + normal, temperedlb, + "lockObtained: running action immediately\n" + ); + + + action(); + } +} + +void TemperedLB::satisfyLockRequest() { + vtAssert(not is_locked_, "Must not already be locked to satisfy a request"); + if (try_locks_.size() > 0) { + // find the best lock to give + for (auto&& tl : try_locks_) { + vt_debug_print( + verbose, temperedlb, + "satisfyLockRequest: node={}, c_try={}\n", tl.requesting_node, tl.c_try + ); + } + + auto iter = try_locks_.begin(); + auto lock = *iter; + try_locks_.erase(iter); + + auto const this_node = theContext()->getNode(); + + vt_debug_print( + normal, temperedlb, + "satisfyLockRequest: locked obtained for node={}\n", + lock.requesting_node + ); + + NodeInfo this_info{ + this_new_load_, this_new_work_, + this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, + this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, + this_new_breakdown_.shared_vol + }; + + proxy_[lock.requesting_node].template send<&TemperedLB::lockObtained>( + this_node, cur_clusters_, current_memory_usage_, + max_object_working_bytes_, max_object_serialized_bytes_, + lock.c_try, this_info + ); + + is_locked_ = true; + locking_rank_ = lock.requesting_node; + } +} + +void TemperedLB::swapClusters() { + n_transfers_swap_ = 0; + + auto const this_node = theContext()->getNode(); + + NodeInfo this_info{ + this_new_load_, this_new_work_, + this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, + this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, + this_new_breakdown_.shared_vol + }; + + auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: swapClusters"); + theTerm()->pushEpoch(lazy_epoch); + + auto criterion = [&,this]( + auto try_rank, auto try_mem, auto const& src_cluster, auto const& try_cluster + ) -> double { + + // Necessary but not sufficient check regarding memory bounds + if (try_mem - try_cluster.bytes + src_cluster.bytes > mem_thresh_) { + return - epsilon; + } + + auto const src_mem = current_memory_usage_; + if (src_mem + try_cluster.bytes - src_cluster.bytes > mem_thresh_) { + return - epsilon; + } + + auto const& try_info = load_info_.find(try_rank)->second; + + double const src_new_work = + computeWorkAfterClusterSwap(this_node, this_info, src_cluster, try_cluster); + double const dest_new_work = + computeWorkAfterClusterSwap(try_rank, try_info, try_cluster, src_cluster); + + // Return load transfer criterion + return loadTransferCriterion( + this_new_work_, try_info.work, src_new_work, dest_new_work + ); + }; + + // Identify and message beneficial cluster swaps + for (auto const& [try_rank, try_clusters] : other_rank_clusters_) { + bool found_potential_good_swap = false; + + // Approximate the memory usage on the target + BytesType try_mem = + other_rank_working_bytes_.find(try_rank)->second; + for (auto const& [try_shared_id, try_cluster] : try_clusters) { + try_mem += try_cluster.bytes; + } + + // Iterate over source clusters + for (auto const& [src_shared_id, src_cluster] : cur_clusters_) { + // Compute approximation swap criterion for empty cluster "swap" case + { + ClusterInfo empty_cluster; + double c_try = criterion(try_rank, try_mem, src_cluster, empty_cluster); + if (c_try >= 0.0) { + // Try to obtain lock for feasible swap + found_potential_good_swap = true; + proxy_[try_rank].template send<&TemperedLB::tryLock>(this_node, c_try); + break; + } + } + + // Iterate over target clusters + for (auto const& [try_shared_id, try_cluster] : try_clusters) { + // Decide whether swap is beneficial + double c_try = criterion(try_rank, try_mem, src_cluster, try_cluster); + if (c_try >= 0.0) { + // Try to obtain lock for feasible swap + found_potential_good_swap = true; + proxy_[try_rank].template send<&TemperedLB::tryLock>(this_node, c_try); + break; + } + } // try_clusters + if (found_potential_good_swap) { + break; + } + } // cur_clusters_ + } // other_rank_clusters + + // We have to be very careful here since we will allow some reentrancy here. + constexpr int turn_scheduler_times = 10; + for (int i = 0; i < turn_scheduler_times; i++) { + theSched()->runSchedulerOnceImpl(); + } + + while (not theSched()->workQueueEmpty()) { + theSched()->runSchedulerOnceImpl(); + } + + ready_to_satisfy_locks_ = true; + satisfyLockRequest(); + + // Finalize epoch, we have sent our initial round of messages + // from here everything is message driven + theTerm()->finishedEpoch(lazy_epoch); + theTerm()->popEpoch(lazy_epoch); + vt::runSchedulerThrough(lazy_epoch); + + vt_debug_print( + normal, temperedlb, + "After iteration: total memory usage={}, shared blocks here={}, " + "memory_threshold={}, load={}\n", computeMemoryUsage(), + getSharedBlocksHere().size(), mem_thresh_, this_new_load_ + ); + + // Report on rejection rate in debug mode + if (theConfig()->vt_debug_temperedlb) { + int n_rejected = 0; + auto remote_block_count = getRemoteBlockCountHere(); + runInEpochCollective("TemperedLB::swapClusters -> compute rejection", [=] { proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( - n_rejected, n_transfers + n_rejected, n_transfers_swap_, remote_block_count ); }); } diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h index e6f8854569..f28084973f 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h @@ -59,6 +59,13 @@ namespace vt { namespace vrt { namespace collection { namespace lb { +struct WorkBreakdown { + double work = 0; + double intra_send_vol = 0, intra_recv_vol = 0; + double inter_send_vol = 0, inter_recv_vol = 0; + double shared_vol = 0; +}; + struct TemperedLB : BaseLB { using LoadMsgAsync = balance::LoadMsgAsync; using LoadMsgSync = balance::LoadMsg; @@ -67,6 +74,9 @@ struct TemperedLB : BaseLB { using ReduceMsgType = vt::collective::ReduceNoneMsg; using QuantityType = std::map; using StatisticMapType = std::unordered_map; + using EdgeMapType = std::unordered_map< + elm::ElementIDStruct, std::vector> + >; TemperedLB() = default; TemperedLB(TemperedLB const&) = delete; @@ -90,7 +100,8 @@ struct TemperedLB : BaseLB { void doLBStages(LoadType start_imb); void informAsync(); void informSync(); - void decide(); + void originalTransfer(); + void swapClusters(); void migrate(); void propagateRound(uint8_t k_cur_async, bool sync, EpochType epoch = no_epoch); @@ -114,11 +125,228 @@ struct TemperedLB : BaseLB { void lazyMigrateObjsTo(EpochType epoch, NodeType node, ObjsType const& objs); void inLazyMigrations(balance::LazyMigrationMsg* msg); void loadStatsHandler(std::vector const& vec); - void rejectionStatsHandler(int n_rejected, int n_transfers); + void workStatsHandler(std::vector const& vec); + void rejectionStatsHandler( + int n_rejected, int n_transfers, int n_unhomed_blocks + ); + void remoteBlockCountHandler(int n_unhomed_blocks); void thunkMigrations(); void setupDone(); + /** + * \brief Read the memory data from the user-defined json blocks into data + * structures + */ + void readClustersMemoryData(); + + /** + * \brief Compute the memory usage for current assignment + * + * \return the total memory usage + */ + BytesType computeMemoryUsage(); + + /** + * \brief Get the shared blocks that are located on this node with the current + * object assignment + * + * \return the set of shared blocks here + */ + std::set getSharedBlocksHere() const; + + /** + * \brief Get the number of shared blocks that are located on this node with + * the current object assignment but are not homed here + * + * \return the number of unhomed shared blocks here + */ + int getRemoteBlockCountHere() const; + + /** + * \brief Compute the current cluster assignment summary for this rank + */ + void computeClusterSummary(); + + /** + * \brief Try to lock a rank + * + * \param[in] requesting_node the requesting rank asking to lock + * \param[in] criterion_value the criterion evaluation value to compare + */ + void tryLock(NodeType requesting_node, double criterion_value); + + /** + * \struct LockedInfoMsg + * + * \brief The update message that comes from a rank when it is locked. This is + * a message instead of a normal handler so it can be buffered without copying + * it. + */ + struct LockedInfoMsg : vt::Message { + using MessageParentType = vt::Message; + vt_msg_serialize_required(); // locked_clusters_ + + LockedInfoMsg() = default; + LockedInfoMsg( + NodeType in_locked_node, + ClusterSummaryType in_locked_clusters, BytesType in_locked_bytes, + BytesType in_locked_max_object_working_bytes, + BytesType in_locked_max_object_serialized_bytes, + double in_locked_c_try, + NodeInfo in_locked_info + ) : locked_node(in_locked_node), + locked_clusters(in_locked_clusters), + locked_bytes(in_locked_bytes), + locked_max_object_working_bytes(in_locked_max_object_working_bytes), + locked_max_object_serialized_bytes(in_locked_max_object_serialized_bytes), + locked_c_try(in_locked_c_try), + locked_info(in_locked_info) + { } + + template + void serialize(SerializerT& s) { + MessageParentType::serialize(s); + s | locked_node; + s | locked_clusters; + s | locked_bytes; + s | locked_max_object_working_bytes; + s | locked_max_object_serialized_bytes; + s | locked_c_try; + s | locked_info; + } + + /// The node that is locked + NodeType locked_node = uninitialized_destination; + /// The up-to-date summary of the clusters + ClusterSummaryType locked_clusters = {}; + /// The total bytes for the locked node + BytesType locked_bytes = 0; + /// The largest working bytes for the locked node + BytesType locked_max_object_working_bytes = 0; + /// The largest serialized bytes for the locked node + BytesType locked_max_object_serialized_bytes = 0; + /// The approximate criterion value at the time it was locked with possible + /// out-of-date info + double locked_c_try = 0; + /// All the node info + NodeInfo locked_info; + }; + + /** + * \brief Satisfy a lock request (if there is one) + */ + void satisfyLockRequest(); + + /** + * \brief Inform a rank that a lock was obtained + * + * \param[in] msg update message with all the info + */ + void lockObtained(LockedInfoMsg* msg); + + /** + * \brief Compute memory component of tempered transfer criterion + * + * \param[in] try_total_bytes: total memory bytes on target rank + * \param[in] src_bytes: memory bytes to be transferred from source rank + */ + bool memoryTransferCriterion(double try_total_bytes, double src_bytes); + + /** + * \brief Compute load component of tempered transfer criterion + * + * \param[in] before_w_src: original work on source rank + * \param[in] before_w_dst: original work on destination rank + * \param[in] after_w_src: new work on source rank + * \param[in] after_w_dst: new work on destination rank + */ + double loadTransferCriterion( + double before_w_src, double before_w_dst, double after_w_src, + double after_w_dst + ); + + /** + * \brief Compute the amount of work based on the work model + * + * \note Model: α * load + β * inter_comm_bytes + δ * intra_comm_bytes + + * ζ * shared_comm_bytes + γ + * + * \param[in] load the load for a rank + * \param[in] comm_bytes the external communication + * + * \return the amount of work + */ + double computeWork( + double load, double inter_comm_bytes, double intra_comm_bytes, + double shared_comm_bytes + ) const; + + /** + * \brief Compute work based on a a set of objects + * + * \param[in] node the node these objects are mapped to + * \param[in] objs input set of objects + * \param[in] exclude a set of objects to exclude that are in objs + * \param[in] include a map of objects to include that are not in objs + * + * \return the amount of work currently for the set of objects + */ + WorkBreakdown computeWorkBreakdown( + NodeType node, + std::unordered_map const& objs, + std::set const& exclude = {}, + std::unordered_map const& include = {} + ); + + double computeWorkAfterClusterSwap( + NodeType node, NodeInfo const& info, ClusterInfo const& to_remove, + ClusterInfo const& to_add + ); + + /** + * \brief Consider possible swaps with all the up-to-date info from a rank + * + * \param[in] msg update message with all the info + */ + void considerSwapsAfterLock(MsgSharedPtr msg); + + /** + * \brief Release a lock on a rank + */ + void releaseLock(); + + /** + * \brief Give a cluster to a rank + * + * \param[in] from_rank the rank it's coming from + * \param[in] give_shared_blocks_size the shared block info for the swap + * \param[in] give_objs the objects given + * \param[in] give_obj_shared_block the shared block the objs are part of + * \param[in] give_obj_working_bytes the working bytes for the objs + * \param[in] take_cluster (optional) a cluster requested in return + */ + void giveCluster( + NodeType from_rank, + std::unordered_map const& give_shared_blocks_size, + std::unordered_map const& give_objs, + std::unordered_map const& give_obj_shared_block, + std::unordered_map const& give_obj_working_bytes, + SharedIDType take_cluster + ); + + /** + * \internal \brief Remove a cluster to send. Does all the bookkeeping + * associated with removing the cluster + * + * \param[in] shared_id the shared ID of the cluster to remove + * \param[in] objs the set of objects to send with that shared ID (optional, + * if not specified then send all of them) + * + * \return a tuple with all the information to send to \c giveCluster + */ + auto removeClusterToSend(SharedIDType shared_id, std::set objs = {}); + private: uint16_t f_ = 0; uint8_t k_max_ = 0; @@ -157,8 +385,8 @@ struct TemperedLB : BaseLB { */ bool target_pole_ = false; std::random_device seed_; - std::unordered_map load_info_ = {}; - std::unordered_map new_load_info_ = {}; + std::unordered_map load_info_ = {}; + std::unordered_map new_load_info_ = {}; objgroup::proxy::Proxy proxy_ = {}; bool is_overloaded_ = false; bool is_underloaded_ = false; @@ -166,21 +394,122 @@ struct TemperedLB : BaseLB { std::unordered_set underloaded_ = {}; std::unordered_set new_underloaded_ = {}; std::unordered_map cur_objs_ = {}; + EdgeMapType send_edges_; + EdgeMapType recv_edges_; LoadType this_new_load_ = 0.0; + LoadType this_new_work_ = 0.0; + WorkBreakdown this_new_breakdown_; LoadType new_imbalance_ = 0.0; + LoadType new_work_imbalance_ = 0.0; + LoadType work_mean_ = 0.0; + LoadType work_max_ = 0.0; LoadType target_max_load_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; InformTypeEnum inform_type_ = InformTypeEnum::AsyncInform; + /// Type of strategy to be used in transfer stage + TransferTypeEnum transfer_type_ = TransferTypeEnum::Original; ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::FewestMigrations; CMFTypeEnum cmf_type_ = CMFTypeEnum::NormByMax; KnowledgeEnum knowledge_ = KnowledgeEnum::Log; bool setup_done_ = false; bool propagate_next_round_ = false; + double alpha = 1.0; + double beta = 0.0; + double gamma = 0.0; + double delta = 0.0; + double epsilon = std::numeric_limits::infinity(); std::vector propagated_k_; std::mt19937 gen_propagate_; std::mt19937 gen_sample_; StatisticMapType stats; LoadType this_load = 0.0f; + LoadType this_work = 0.0f; + /// Whether any node has communication data + bool has_comm_any_ = false; + + void hasCommAny(bool has_comm_any); + void giveEdges(EdgeMapType const& edge_map); + + ////////////////////////////////////////////////////////////////////////////// + // All the memory info (may or may not be present) + ////////////////////////////////////////////////////////////////////////////// + + struct TryLock { + TryLock(NodeType in_requesting, double in_c_try) + : requesting_node(in_requesting), + c_try(in_c_try) + { } + + NodeType requesting_node = uninitialized_destination; + double c_try = 0; + + double operator<(TryLock const& other) const { + // sort in reverse order so the best is first! + return c_try > other.c_try; + } + }; + + struct ObjLoad { + ObjLoad(ObjIDType in_obj_id, LoadType in_load) + : obj_id(in_obj_id), + load(in_load) + { } + + ObjIDType obj_id = {}; + LoadType load = 0; + + double operator<(ObjLoad const& other) const { + return load < other.load; + } + }; + + /// Whether we have memory information + bool has_memory_data_ = false; + /// Working bytes for this rank + BytesType rank_bytes_ = 0; + /// Shared ID for each object + std::unordered_map obj_shared_block_; + /// Shared block size in bytes + std::unordered_map shared_block_size_; + /// Shared block edges + std::unordered_map> shared_block_edge_; + /// Working bytes for each object + std::unordered_map obj_working_bytes_; + /// Serialized bytes for each object + std::unordered_map obj_serialized_bytes_; + /// Footprint bytes for each object + std::unordered_map obj_footprint_bytes_; + /// Cluster summary based on current local assignment + ClusterSummaryType cur_clusters_; + /// Clusters that we know of on other ranks (might be out of date) + std::unordered_map other_rank_clusters_; + /// Working bytes for ranks we know about (never out of date) + std::unordered_map other_rank_working_bytes_; + /// User-defined memory threshold + BytesType mem_thresh_ = 0; + /// The max working bytes for an object currently residing here + BytesType max_object_working_bytes_ = 0; + /// The max serialized bytes for an object currently residing here + BytesType max_object_serialized_bytes_ = 0; + /// Current memory usage based on distribution + BytesType current_memory_usage_ = 0; + /// Whether this rank is locked or now + bool is_locked_ = false; + // Which rank locked this rank + NodeType locking_rank_ = uninitialized_destination; + /// Try locks that have arrived from other ranks + std::set try_locks_; + /// Pending operations that are waiting for an unlock + std::list pending_actions_; + /// Number of swaps so far + int n_transfers_swap_ = 0; + /// Whether it's mid-swap or not + bool is_swapping_ = false; + /// Max-load over ranks vector + std::vector max_load_over_iters_; + /// Ready to satify looks + bool ready_to_satisfy_locks_ = false; + int consider_swaps_counter_ = 0; }; }}}} /* end namespace vt::vrt::collection::lb */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e0d10b060e..8cf9666ada 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -215,6 +215,15 @@ foreach(SUB_DIR ${UNIT_TEST_SUBDIRS_LIST}) endif() endforeach() +# Copy synthetic blocks data files to /tests/synthetic-blocks-data +set(SYNTHETIC_BLOCKS_DATA_DEST "${CMAKE_BINARY_DIR}/tests/synthetic-blocks-data") +file(MAKE_DIRECTORY ${SYNTHETIC_BLOCKS_DATA_DEST}) +file(GLOB SYNTHETIC_BLOCKS_DATA_FILES "${CMAKE_SOURCE_DIR}/tests/data/synthetic-blocks/*") +foreach(SYNTHETIC_BLOCKS_DATA_FILE ${SYNTHETIC_BLOCKS_DATA_FILES}) + get_filename_component(FILE_NAME ${SYNTHETIC_BLOCKS_DATA_FILE} NAME) + configure_file(${SYNTHETIC_BLOCKS_DATA_FILE} ${SYNTHETIC_BLOCKS_DATA_DEST} COPYONLY) +endforeach() + # # Performance Tests # diff --git a/tests/data/synthetic-blocks/synthetic-dataset-blocks.0.json b/tests/data/synthetic-blocks/synthetic-dataset-blocks.0.json new file mode 100644 index 0000000000..83cd135ae1 --- /dev/null +++ b/tests/data/synthetic-blocks/synthetic-dataset-blocks.0.json @@ -0,0 +1,159 @@ +{ + "metadata": { + "rank": 0, + "type": "LBDatafile" + }, + "phases": [ + { + "communications": [ + { + "bytes": 2.0, + "from": { + "collection_id": 7, + "home": 0, + "seq_id": 0, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 1, + "seq_id": 5, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + }, + { + "bytes": 1.0, + "from": { + "collection_id": 7, + "home": 0, + "seq_id": 1, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 1, + "seq_id": 4, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + }, + { + "bytes": 1.0, + "from": { + "collection_id": 7, + "home": 0, + "seq_id": 3, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 0, + "seq_id": 2, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + }, + { + "bytes": 0.5, + "from": { + "collection_id": 7, + "home": 0, + "seq_id": 3, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 2, + "seq_id": 8, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + } + ], + "id": 0, + "tasks": [ + { + "entity": { + "collection_id": 7, + "home": 0, + "seq_id": 1, + "migratable": true, + "type": "object" + }, + "node": 0, + "resource": "cpu", + "time": 0.5, + "user_defined": { + "home_rank": 0, + "shared_bytes": 9.0, + "shared_id": 0 + } + }, + { + "entity": { + "collection_id": 7, + "home": 0, + "seq_id": 3, + "migratable": true, + "type": "object" + }, + "node": 0, + "resource": "cpu", + "time": 0.5, + "user_defined": { + "home_rank": 0, + "shared_bytes": 9.0, + "shared_id": 1 + } + }, + { + "entity": { + "collection_id": 7, + "home": 0, + "seq_id": 2, + "migratable": true, + "type": "object" + }, + "node": 0, + "resource": "cpu", + "time": 0.5, + "user_defined": { + "home_rank": 0, + "shared_bytes": 9.0, + "shared_id": 1 + } + }, + { + "entity": { + "collection_id": 7, + "home": 0, + "seq_id": 0, + "migratable": true, + "type": "object" + }, + "node": 0, + "resource": "cpu", + "time": 1.0, + "user_defined": { + "home_rank": 0, + "shared_bytes": 9.0, + "shared_id": 0 + } + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/data/synthetic-blocks/synthetic-dataset-blocks.1.json b/tests/data/synthetic-blocks/synthetic-dataset-blocks.1.json new file mode 100644 index 0000000000..050fd1b1a5 --- /dev/null +++ b/tests/data/synthetic-blocks/synthetic-dataset-blocks.1.json @@ -0,0 +1,140 @@ +{ + "metadata": { + "rank": 1, + "type": "LBDatafile" + }, + "phases": [ + { + "communications": [ + { + "bytes": 2.0, + "from": { + "collection_id": 7, + "home": 1, + "seq_id": 4, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 0, + "seq_id": 1, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + }, + { + "bytes": 2.0, + "from": { + "collection_id": 7, + "home": 1, + "seq_id": 5, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 2, + "seq_id": 8, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + }, + { + "bytes": 1.0, + "from": { + "collection_id": 7, + "home": 1, + "seq_id": 7, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 1, + "seq_id": 6, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + } + ], + "id": 0, + "tasks": [ + { + "entity": { + "collection_id": 7, + "home": 1, + "seq_id": 5, + "migratable": true, + "type": "object" + }, + "node": 1, + "resource": "cpu", + "time": 2.0, + "user_defined": { + "home_rank": 1, + "shared_bytes": 9.0, + "shared_id": 2 + } + }, + { + "entity": { + "collection_id": 7, + "home": 1, + "seq_id": 4, + "migratable": true, + "type": "object" + }, + "node": 1, + "resource": "cpu", + "time": 0.5, + "user_defined": { + "home_rank": 1, + "shared_bytes": 9.0, + "shared_id": 2 + } + }, + { + "entity": { + "collection_id": 7, + "home": 1, + "seq_id": 7, + "migratable": true, + "type": "object" + }, + "node": 1, + "resource": "cpu", + "time": 0.5, + "user_defined": { + "home_rank": 1, + "shared_bytes": 9.0, + "shared_id": 3 + } + }, + { + "entity": { + "collection_id": 7, + "home": 1, + "seq_id": 6, + "migratable": true, + "type": "object" + }, + "node": 1, + "resource": "cpu", + "time": 1.0, + "user_defined": { + "home_rank": 1, + "shared_bytes": 9.0, + "shared_id": 3 + } + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/data/synthetic-blocks/synthetic-dataset-blocks.2.json b/tests/data/synthetic-blocks/synthetic-dataset-blocks.2.json new file mode 100644 index 0000000000..39dfa10e8c --- /dev/null +++ b/tests/data/synthetic-blocks/synthetic-dataset-blocks.2.json @@ -0,0 +1,51 @@ +{ + "metadata": { + "rank": 2, + "type": "LBDatafile" + }, + "phases": [ + { + "communications": [ + { + "bytes": 1.5, + "from": { + "collection_id": 7, + "home": 2, + "seq_id": 8, + "migratable": true, + "type": "object" + }, + "messages": 1, + "to": { + "collection_id": 7, + "home": 1, + "seq_id": 6, + "migratable": true, + "type": "object" + }, + "type": "SendRecv" + } + ], + "id": 0, + "tasks": [ + { + "entity": { + "collection_id": 7, + "home": 2, + "seq_id": 8, + "migratable": true, + "type": "object" + }, + "node": 2, + "resource": "cpu", + "time": 1.5, + "user_defined": { + "home_rank": 2, + "shared_bytes": 9.0, + "shared_id": 4 + } + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/data/synthetic-blocks/synthetic-dataset-blocks.3.json b/tests/data/synthetic-blocks/synthetic-dataset-blocks.3.json new file mode 100644 index 0000000000..f2868aa3bd --- /dev/null +++ b/tests/data/synthetic-blocks/synthetic-dataset-blocks.3.json @@ -0,0 +1 @@ +{"metadata":{"type":"LBDatafile","rank":3},"phases":[{"id":0,"tasks":[]}]} diff --git a/tests/unit/collection/test_lb.extended.cc b/tests/unit/collection/test_lb.extended.cc index 2c6379cc38..15511d158b 100644 --- a/tests/unit/collection/test_lb.extended.cc +++ b/tests/unit/collection/test_lb.extended.cc @@ -150,6 +150,11 @@ TEST_P(TestLoadBalancerOther, test_load_balancer_other_keep_last_elm) { runTest(GetParam(), "test_load_balancer_other_keep_last_elm"); } +TEST_P(TestLoadBalancerOther, test_load_balancer_other_run_lb_first_phase) { + vt::theConfig()->vt_lb_run_lb_first_phase = true; + runTest(GetParam(), "test_load_balancer_other_run_lb_first_phase"); +} + TEST_P(TestLoadBalancerGreedy, test_load_balancer_greedy_2) { runTest(GetParam(), "test_load_balancer_greedy_2"); } @@ -159,6 +164,11 @@ TEST_P(TestLoadBalancerGreedy, test_load_balancer_greedy_keep_last_elm) { runTest(GetParam(), "test_load_balancer_greedy_keep_last_elm"); } +TEST_P(TestLoadBalancerGreedy, test_load_balancer_greedy_run_lb_first_phase) { + vt::theConfig()->vt_lb_run_lb_first_phase = true; + runTest(GetParam(), "test_load_balancer_greedy_run_lb_first_phase"); +} + TEST_F(TestLoadBalancerOther, test_make_graph_symmetric) { // setup auto const this_node = theContext()->getNode(); diff --git a/tests/unit/lb/test_temperedlb.cc b/tests/unit/lb/test_temperedlb.cc new file mode 100644 index 0000000000..bb9dd2e652 --- /dev/null +++ b/tests/unit/lb/test_temperedlb.cc @@ -0,0 +1,113 @@ +#include +#include +#include + +#include "test_helpers.h" +#include "test_parallel_harness.h" + +namespace vt { namespace tests { namespace unit { namespace lb { + +#if vt_check_enabled(lblite) + +using TestTemperedLB = TestParallelHarness; + +std::string writeTemperedLBConfig( + std::string transfer_strategy, double memory_threshold, double alpha = 1.0, + double beta = 0.0, double gamma = 0.0, double delta = 0.0 +) { + auto const this_node = theContext()->getNode(); + auto config_file = getUniqueFilename(); + if (this_node == 0) { + std::ofstream cfg_file_{config_file.c_str(), std::ofstream::out | std::ofstream::trunc}; + cfg_file_ << "0 TemperedLB iters=10 trials=3 transfer=" << transfer_strategy << + " alpha=" << alpha << + " beta=" << beta << + " gamma=" << gamma << + " delta=" << delta; + if (transfer_strategy == "SwapClusters") { + cfg_file_ << " memory_threshold=" << memory_threshold; + } + cfg_file_.close(); + } + return config_file; +} + +void runTemperedLBTest(std::string config_file, double expected_imb = 0.0) { + // Clear the LB config + vrt::collection::balance::ReadLBConfig::clear(); + + // Set configuration + theConfig()->vt_lb = true; + theConfig()->vt_lb_data_in = true; + theConfig()->vt_lb_file_name = config_file; + theConfig()->vt_lb_data_file_in="synthetic-dataset-blocks.%p.json"; + theConfig()->vt_lb_data_dir_in="synthetic-blocks-data"; + + // Replay load balancing + int initial_phase = 0; + int phases_to_run = 1; + int phase_mod = 0; + vt::vrt::collection::balance::replay::replayWorkloads( + initial_phase, phases_to_run, phase_mod + ); + + // Get information for the last phase + auto phase_info = theLBManager()->getPhaseInfo(); + + // Assert that temperedLB found the correct imbalance + EXPECT_EQ(phase_info->imb_load_post_lb, expected_imb); +} + +// The following tests use expected values found by the MILP + +TEST_F(TestTemperedLB, test_load_only_original) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("Original", 1e8); + runTemperedLBTest(cfg); +} + +TEST_F(TestTemperedLB, test_load_only_swapclusters) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 1e8); + runTemperedLBTest(cfg, 0.25); +} + +TEST_F(TestTemperedLB, test_load_and_memory_swapclusters) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 20); + runTemperedLBTest(cfg, 0.25); +} + +TEST_F(TestTemperedLB, test_load_no_memory_delta_10) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 1e8, 1, 0, 0, 1); + runTemperedLBTest(cfg, 1.0); +} + +TEST_F(TestTemperedLB, test_load_no_memory_delta_01) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 1e8, 1, 0, 0, 0.1); + runTemperedLBTest(cfg, 0.25); +} + +TEST_F(TestTemperedLB, test_load_memory_delta_01) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 20, 1, 0, 0, 0.1); + runTemperedLBTest(cfg, 0.25); +} + +TEST_F(TestTemperedLB, test_load_no_memory_delta_03) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 1e8, 1, 0, 0, 0.3); + runTemperedLBTest(cfg, 1.0); +} + +TEST_F(TestTemperedLB, test_load_memory_delta_03) { + SET_NUM_NODES_CONSTRAINT(4); + auto cfg = writeTemperedLBConfig("SwapClusters", 20, 1, 0, 0, 0.3); + runTemperedLBTest(cfg, 1.0); +} + +#endif + +}}}} /* end namespace vt::tests::unit::lb */ diff --git a/tests/unit/runtime/test_initialization.cc b/tests/unit/runtime/test_initialization.cc index 7f2a923da4..465683f226 100644 --- a/tests/unit/runtime/test_initialization.cc +++ b/tests/unit/runtime/test_initialization.cc @@ -541,6 +541,9 @@ void prepareLBDataFiles(const std::string file_name_without_ext) { for (PhaseType i = 0; i < num_phases; i++) { for (auto&& elm : ids[i]) { dh.node_data_[i][elm] = LoadSummary{3}; + std::vector arr = {1}; + VirtualProxyType proxy = 7; + dh.node_idx_[elm] = std::make_tuple(proxy, arr); } }