From 58325ee73b0f01d8feb88d4f2fda7a38b77d27f5 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 08:47:09 -1000 Subject: [PATCH 1/8] A few fixes --- include/wrench/services/compute/batch/BatchComputeService.h | 1 + .../services/compute/bare_metal/BareMetalComputeService.cpp | 3 +-- src/wrench/simgrid_S4U_util/S4U_Daemon.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/wrench/services/compute/batch/BatchComputeService.h b/include/wrench/services/compute/batch/BatchComputeService.h index 65b9d9a893..9f288ab232 100755 --- a/include/wrench/services/compute/batch/BatchComputeService.h +++ b/include/wrench/services/compute/batch/BatchComputeService.h @@ -177,6 +177,7 @@ namespace wrench { std::map available_nodes_to_cores; std::unordered_map host_id_to_names; std::vector compute_hosts; + /* End Resources information in batch */ // Vector of one-shot bare-metal compute services diff --git a/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp b/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp index 66fb91d196..6c79d68466 100755 --- a/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp +++ b/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp @@ -258,8 +258,7 @@ namespace wrench { scratch_space_mount_point) { std::map> specified_compute_resources; for (const auto &h: compute_hosts) { - specified_compute_resources.insert( - std::make_pair(h, std::make_tuple(ComputeService::ALL_CORES, ComputeService::ALL_RAM))); + specified_compute_resources[h] = std::make_tuple(ComputeService::ALL_CORES, ComputeService::ALL_RAM); } initiateInstance(hostname, diff --git a/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp b/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp index f52e2eb50d..30761265ee 100755 --- a/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp +++ b/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp @@ -172,7 +172,7 @@ namespace wrench { // Create the s4u_actor try { this->s4u_actor = simgrid::s4u::Actor::create(this->process_name.c_str(), - S4U_Simulation::get_host_or_vm_by_name(hostname), + this->host, S4U_DaemonActor(this)); } catch (simgrid::Exception &e) { throw std::runtime_error("S4U_Daemon::startDaemon(): SimGrid actor creation failed... shouldn't happen."); From 67b86e64502a86f017b330093ecd005cb83b5cd7 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 15:10:48 -1000 Subject: [PATCH 2/8] Replaced more std::string by simgrid::s4u::Host* --- .../compute/batch/BatchComputeService.h | 12 +-- .../wrench/services/compute/batch/BatchJob.h | 6 +- .../homegrown/HomegrownBatchScheduler.h | 2 +- .../ConservativeBackfillingBatchScheduler.h | 2 +- ...vativeBackfillingBatchSchedulerCoreLevel.h | 2 +- .../homegrown/fcfs/FCFSBatchScheduler.h | 2 +- .../compute/batch/BatchComputeService.cpp | 53 ++++++---- .../services/compute/batch/BatchJob.cpp | 6 +- .../ConservativeBackfillingBatchScheduler.cpp | 23 +++-- ...tiveBackfillingBatchSchedulerCoreLevel.cpp | 22 ++--- .../homegrown/fcfs/FCFSBatchScheduler.cpp | 99 +++++++++---------- 11 files changed, 117 insertions(+), 112 deletions(-) diff --git a/include/wrench/services/compute/batch/BatchComputeService.h b/include/wrench/services/compute/batch/BatchComputeService.h index 9f288ab232..2845d03884 100755 --- a/include/wrench/services/compute/batch/BatchComputeService.h +++ b/include/wrench/services/compute/batch/BatchComputeService.h @@ -172,11 +172,11 @@ namespace wrench { /* Resources information in batch */ unsigned long total_num_of_nodes; unsigned long num_cores_per_node; - std::unordered_map nodes_to_cores_map; + std::unordered_map nodes_to_cores_map; std::vector timeslots; - std::map available_nodes_to_cores; - std::unordered_map host_id_to_names; - std::vector compute_hosts; + std::map available_nodes_to_cores; + std::unordered_map host_id_to_names; + std::vector compute_hosts; /* End Resources information in batch */ @@ -263,7 +263,7 @@ namespace wrench { void processAlarmJobTimeout(const std::shared_ptr &batch_job); //free up resources - void freeUpResources(const std::map> &resources); + void freeUpResources(const std::map> &resources); //send call back to the pilot job submitters void sendPilotJobExpirationNotification(const std::shared_ptr &job); @@ -275,7 +275,7 @@ namespace wrench { void processJobSubmission(const std::shared_ptr &job, simgrid::s4u::Mailbox *answer_mailbox); //start a job - void startJob(const std::map> &, const std::shared_ptr &, + void startJob(const std::map> &, const std::shared_ptr &, const std::shared_ptr &, unsigned long, unsigned long, unsigned long); // process a resource request diff --git a/include/wrench/services/compute/batch/BatchJob.h b/include/wrench/services/compute/batch/BatchJob.h index 53e850145d..b1daebb9db 100755 --- a/include/wrench/services/compute/batch/BatchJob.h +++ b/include/wrench/services/compute/batch/BatchJob.h @@ -33,8 +33,8 @@ namespace wrench { unsigned long getRequestedNumNodes() const; std::shared_ptr getCompoundJob(); void setEndingTimestamp(double time_stamp); - std::map> getResourcesAllocated(); - void setAllocatedResources(const std::map> &resources); + std::map> getResourcesAllocated(); + void setAllocatedResources(const std::map> &resources); /** * @brief Set the indices of the allocated nodes @@ -68,7 +68,7 @@ namespace wrench { double begin_time_stamp; double ending_time_stamp; double arrival_time_stamp; - std::map> resources_allocated; + std::map> resources_allocated; std::vector allocated_node_indices; diff --git a/include/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.h b/include/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.h index e9e5a56cda..44a157ba94 100755 --- a/include/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.h +++ b/include/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.h @@ -50,7 +50,7 @@ namespace wrench { * @param ram_per_node: amount of RAM * @return a host: map */ - virtual std::map> scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) = 0; + virtual std::map> scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) = 0; }; /***********************/ diff --git a/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.h b/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.h index 6e6ef7bded..f3ccf472bb 100755 --- a/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.h +++ b/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.h @@ -37,7 +37,7 @@ namespace wrench { void compactSchedule(); - std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; + std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; std::map getStartTimeEstimates(std::set> set_of_jobs) override; diff --git a/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.h b/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.h index ae1367587f..ccc269b182 100755 --- a/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.h +++ b/include/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.h @@ -37,7 +37,7 @@ namespace wrench { void compactSchedule(); - std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; + std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; std::map getStartTimeEstimates(std::set> set_of_jobs) override; diff --git a/include/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.h b/include/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.h index c0c720fbf4..2aa7d2df66 100755 --- a/include/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.h +++ b/include/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.h @@ -41,7 +41,7 @@ namespace wrench { std::shared_ptr pickNextJobToSchedule(); - std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; + std::map> scheduleOnHosts(unsigned long, unsigned long, double) override; std::map getStartTimeEstimates(std::set> set_of_jobs) override; }; diff --git a/src/wrench/services/compute/batch/BatchComputeService.cpp b/src/wrench/services/compute/batch/BatchComputeService.cpp index bb0a9f6e87..f5f54bbb42 100755 --- a/src/wrench/services/compute/batch/BatchComputeService.cpp +++ b/src/wrench/services/compute/batch/BatchComputeService.cpp @@ -101,10 +101,16 @@ namespace wrench { "BatchComputeService::BatchComputeService(): at least one compute hosts must be provided"); } + // Get the hosts + for (const auto &h : compute_hosts) { + this->compute_hosts.push_back(S4U_Simulation::get_host_or_vm_by_name(h)); + } + // Check Platform homogeneity - double num_cores_available = (double) (Simulation::getHostNumCores(*(compute_hosts.begin()))); - double speed = Simulation::getHostFlopRate(*(compute_hosts.begin())); - double ram_available = Simulation::getHostMemoryCapacity(*(compute_hosts.begin())); + auto first_host = *(this->compute_hosts.begin()); + double num_cores_available = (double) (first_host->get_core_count()); + double speed = first_host->get_speed(); + double ram_available = S4U_Simulation::getHostMemoryCapacity(first_host); for (auto const &h: compute_hosts) { // Compute speed @@ -127,14 +133,14 @@ namespace wrench { } } + //create a map for host to cores int i = 0; - for (const auto &h: compute_hosts) { - this->nodes_to_cores_map.insert({h, num_cores_available}); - this->available_nodes_to_cores.insert({h, num_cores_available}); + for (const auto &h: this->compute_hosts) { + this->nodes_to_cores_map[h] = num_cores_available; + this->available_nodes_to_cores[h] = num_cores_available; this->host_id_to_names[i++] = h; } - this->compute_hosts = compute_hosts; this->num_cores_per_node = this->nodes_to_cores_map.begin()->second; this->total_num_of_nodes = compute_hosts.size(); @@ -480,11 +486,11 @@ namespace wrench { /** * @brief Increase resource availabilities based on freed resources * @param resources: a set of tuples as follows: - * - hostname (string) + * - host * - number of cores (unsigned long) * - bytes of RAM (double) */ - void BatchComputeService::freeUpResources(const std::map> &resources) { + void BatchComputeService::freeUpResources(const std::map> &resources) { for (auto r: resources) { this->available_nodes_to_cores[r.first] += std::get<0>(r.second); } @@ -800,9 +806,9 @@ namespace wrench { if ((requested_hosts > this->available_nodes_to_cores.size()) or (requested_num_cores_per_host > - Simulation::getHostNumCores(this->available_nodes_to_cores.begin()->first)) or + this->available_nodes_to_cores.begin()->first->get_core_count()) or (required_ram_per_host > - Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first))) { + S4U_Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first))) { { S4U_Mailbox::dputMessage( answer_mailbox, @@ -972,7 +978,7 @@ namespace wrench { * @param cores_per_node_asked_for */ void BatchComputeService::startJob( - const std::map> &resources, + const std::map> &resources, const std::shared_ptr &compound_job, const std::shared_ptr &batch_job, unsigned long num_nodes_allocated, unsigned long allocated_time, @@ -982,11 +988,16 @@ namespace wrench { num_nodes_allocated, cores_per_node_asked_for); compound_job->pushCallbackMailbox(this->mailbox); + + std::map> resources_by_hostname; + for (auto const &h : resources) { + resources_by_hostname[h.first->get_name()] = h.second; + } auto executor = std::shared_ptr( new BareMetalComputeServiceOneShot( compound_job, this->hostname, - resources, + resources_by_hostname, {{BareMetalComputeServiceProperty::THREAD_STARTUP_OVERHEAD, this->getPropertyValueAsString( BatchComputeServiceProperty::THREAD_STARTUP_OVERHEAD)}}, @@ -1032,34 +1043,34 @@ namespace wrench { } else if (key == "num_cores") { for (const auto &h: this->nodes_to_cores_map) { - dict.insert(std::make_pair(h.first, (double) (h.second))); + dict.insert(std::make_pair(h.first->get_name(), (double) (h.second))); } } else if (key == "num_idle_cores") { // Num idle cores per hosts for (const auto &h: this->available_nodes_to_cores) { - dict.insert(std::make_pair(h.first, (double) (h.second))); + dict.insert(std::make_pair(h.first->get_name(), (double) (h.second))); } } else if (key == "flop_rates") { // Flop rate per host for (const auto &h: this->nodes_to_cores_map) { - dict.insert(std::make_pair(h.first, S4U_Simulation::getHostFlopRate(h.first))); + dict.insert(std::make_pair(h.first->get_name(), h.first->get_speed())); } } else if (key == "ram_capacities") { // RAM capacity per host for (const auto &h: this->nodes_to_cores_map) { - dict.insert(std::make_pair(h.first, S4U_Simulation::getHostMemoryCapacity(h.first))); + dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first))); } } else if (key == "ram_availabilities") { // RAM availability per host (0 if something is running, full otherwise) for (const auto &h: this->available_nodes_to_cores) { - if (h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) { - dict.insert(std::make_pair(h.first, 0.0)); + if ((double)h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) { + dict.insert(std::make_pair(h.first->get_name(), 0.0)); } else { - dict.insert(std::make_pair(h.first, S4U_Simulation::getHostMemoryCapacity(h.first))); + dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first))); } } } else { @@ -1388,7 +1399,7 @@ namespace wrench { } // Double check that memory requirements of all tasks can be met - if (job->getMinimumRequiredMemory() > Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first)) { + if (job->getMinimumRequiredMemory() > S4U_Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first)) { throw ExecutionException(std::make_shared(job, this->getSharedPtr())); } } diff --git a/src/wrench/services/compute/batch/BatchJob.cpp b/src/wrench/services/compute/batch/BatchJob.cpp index fc53835129..cc4a9bff97 100755 --- a/src/wrench/services/compute/batch/BatchJob.cpp +++ b/src/wrench/services/compute/batch/BatchJob.cpp @@ -171,15 +171,15 @@ namespace wrench { * @brief Get the resources allocated to this BatchComputeService job * @return a list of resource, each as a tuple */ - std::map> BatchJob::getResourcesAllocated() { + std::map> BatchJob::getResourcesAllocated() { return this->resources_allocated; } /** * @brief Set the resources allocated to this BatchComputeService job - * @param resources: a list of resource, each as a tuple + * @param resources: a list of resource, each as a tuple */ - void BatchJob::setAllocatedResources(const std::map> &resources) { + void BatchJob::setAllocatedResources(const std::map> &resources) { if (resources.empty()) { throw std::invalid_argument( "BatchJob::setAllocatedResources(): Empty Resources allocated"); diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp index 0cd9278b7e..ff72eebd3c 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp @@ -228,32 +228,32 @@ namespace wrench { * @return a host: map * */ - std::map> + std::map> ConservativeBackfillingBatchScheduler::scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) { if (ram_per_node == ComputeService::ALL_RAM) { - ram_per_node = Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); + ram_per_node = S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); } if (cores_per_node == ComputeService::ALL_CORES) { - cores_per_node = Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first); + cores_per_node = cs->available_nodes_to_cores.begin()->first->get_core_count(); } - if (ram_per_node > Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { + if (ram_per_node > S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too much RAM per host"); } if (num_nodes > cs->available_nodes_to_cores.size()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many hosts"); } - if (cores_per_node > Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first)) { + if (cores_per_node > cs->available_nodes_to_cores.begin()->first->get_core_count()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many cores per host (asking for " + std::to_string(cores_per_node) + " but hosts have " + - std::to_string(Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first)) + "cores)"); + std::to_string(cs->available_nodes_to_cores.begin()->first->get_core_count()) + "cores)"); } // IMPORTANT: We always give all cores to a job on a node! - cores_per_node = Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first); + cores_per_node = cs->available_nodes_to_cores.begin()->first->get_core_count(); - std::map> resources = {}; - std::vector hosts_assigned = {}; + std::map> resources = {}; + std::vector hosts_assigned = {}; unsigned long host_count = 0; for (auto &available_nodes_to_core: cs->available_nodes_to_cores) { @@ -269,10 +269,9 @@ namespace wrench { } if (resources.size() < num_nodes) { resources = {}; - std::vector::iterator vector_it; // undo! - for (vector_it = hosts_assigned.begin(); vector_it != hosts_assigned.end(); vector_it++) { - cs->available_nodes_to_cores[*vector_it] += cores_per_node; + for (auto const &h : hosts_assigned) { + cs->available_nodes_to_cores[h] += cores_per_node; } } diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp index 1babc3f88d..b6ed4faee9 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp @@ -239,32 +239,32 @@ namespace wrench { * @param ram_per_node: amount of RAM * @return a host: map */ - std::map> + std::map> ConservativeBackfillingBatchSchedulerCoreLevel::scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) { if (ram_per_node == ComputeService::ALL_RAM) { - ram_per_node = Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); + ram_per_node = S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); } if (cores_per_node == ComputeService::ALL_CORES) { - cores_per_node = Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first); + cores_per_node = cs->available_nodes_to_cores.begin()->first->get_core_count(); } - if (ram_per_node > Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { + if (ram_per_node > S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too much RAM per host"); } if (num_nodes > cs->available_nodes_to_cores.size()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many hosts"); } - if (cores_per_node > Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first)) { + if (cores_per_node >cs->available_nodes_to_cores.begin()->first->get_core_count()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many cores per host (asking for " + std::to_string(cores_per_node) + " but hosts have " + - std::to_string(Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first)) + "cores)"); + std::to_string(cs->available_nodes_to_cores.begin()->first->get_core_count()) + "cores)"); } // // IMPORTANT: We always give all cores to a job on a node! // cores_per_node = Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first); - std::map> resources = {}; - std::vector hosts_assigned = {}; + std::map> resources = {}; + std::vector hosts_assigned = {}; unsigned long host_count = 0; for (auto &available_nodes_to_core: cs->available_nodes_to_cores) { @@ -280,10 +280,8 @@ namespace wrench { } if (resources.size() < num_nodes) { resources = {}; - std::vector::iterator vector_it; - // undo! - for (vector_it = hosts_assigned.begin(); vector_it != hosts_assigned.end(); vector_it++) { - cs->available_nodes_to_cores[*vector_it] += cores_per_node; + for (auto const &h: hosts_assigned) { + cs->available_nodes_to_cores[h] += cores_per_node; } } diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp index 4e3539b21c..93d7657330 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp @@ -37,31 +37,31 @@ namespace wrench { * @param ram_per_node: the job's ram per node * @return A resource list */ - std::map> FCFSBatchScheduler::scheduleOnHosts( + std::map> FCFSBatchScheduler::scheduleOnHosts( unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) { if (ram_per_node == ComputeService::ALL_RAM) { - ram_per_node = Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); + ram_per_node = S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); } if (cores_per_node == ComputeService::ALL_CORES) { - cores_per_node = Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first); + cores_per_node = cs->available_nodes_to_cores.begin()->first->get_core_count(); } - if (ram_per_node > Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { + if (ram_per_node > S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first)) { throw std::runtime_error("FCFSBatchScheduler::findNextJobToSchedule(): Asking for too much RAM per host"); } if (num_nodes > cs->available_nodes_to_cores.size()) { throw std::runtime_error("FCFSBatchScheduler::findNextJobToSchedule(): Asking for too many hosts"); } - if (cores_per_node > Simulation::getHostNumCores(cs->available_nodes_to_cores.begin()->first)) { + if (cores_per_node > cs->available_nodes_to_cores.begin()->first->get_core_count()) { throw std::runtime_error("FCFSBatchScheduler::findNextJobToSchedule(): Asking for too many cores per host"); } - std::map> resources = {}; - std::vector hosts_assigned = {}; + std::map> resources = {}; + std::vector hosts_assigned = {}; auto host_selection_algorithm = this->cs->getPropertyValueAsString(BatchComputeServiceProperty::HOST_SELECTION_ALGORITHM); if (host_selection_algorithm == "FIRSTFIT") { - std::map::iterator map_it; + std::map::iterator map_it; unsigned long host_count = 0; for (map_it = cs->available_nodes_to_cores.begin(); map_it != cs->available_nodes_to_cores.end(); map_it++) { @@ -77,19 +77,18 @@ namespace wrench { } if (resources.size() < num_nodes) { resources = {}; - std::vector::iterator vector_it; - for (vector_it = hosts_assigned.begin(); vector_it != hosts_assigned.end(); vector_it++) { - cs->available_nodes_to_cores[*vector_it] += cores_per_node; + for (auto const &h : hosts_assigned) { + cs->available_nodes_to_cores[h] += cores_per_node; } } } else if (host_selection_algorithm == "BESTFIT") { while (resources.size() < num_nodes) { unsigned long target_slack = 0; - std::string target_host = ""; + simgrid::s4u::Host *target_host = nullptr; unsigned long target_num_cores = 0; for (auto h: cs->available_nodes_to_cores) { - std::string hostname = std::get<0>(h); + auto host = std::get<0>(h); unsigned long num_available_cores = std::get<1>(h); if (num_available_cores < cores_per_node) { continue; @@ -98,21 +97,20 @@ namespace wrench { unsigned long tentative_target_slack = num_available_cores - tentative_target_num_cores; - if (target_host.empty() || + if (target_host == nullptr || (tentative_target_num_cores > target_num_cores) || ((tentative_target_num_cores == target_num_cores) && (target_slack > tentative_target_slack))) { - target_host = hostname; + target_host = host; target_num_cores = tentative_target_num_cores; target_slack = tentative_target_slack; } } - if (target_host.empty()) { + if (target_host == nullptr) { WRENCH_INFO("Didn't find a suitable host"); resources = {}; - std::vector::iterator it; - for (it = hosts_assigned.begin(); it != hosts_assigned.end(); it++) { - cs->available_nodes_to_cores[*it] += cores_per_node; + for (auto const &h : hosts_assigned) { + cs->available_nodes_to_cores[h] += cores_per_node; } break; } @@ -128,12 +126,12 @@ namespace wrench { cur_host_idx = (cur_host_idx + 1) % cs->available_nodes_to_cores.size(); auto it = cs->compute_hosts.begin(); it = it + cur_host_idx; - std::string cur_host_name = *it; - unsigned long num_available_cores = cs->available_nodes_to_cores[cur_host_name]; + auto cur_host = *it; + unsigned long num_available_cores = cs->available_nodes_to_cores[cur_host]; if (num_available_cores >= cores_per_node) { - cs->available_nodes_to_cores[cur_host_name] -= cores_per_node; - hosts_assigned.push_back(cur_host_name); - resources.insert(std::make_pair(cur_host_name, std::make_tuple(cores_per_node, ram_per_node))); + cs->available_nodes_to_cores[cur_host] -= cores_per_node; + hosts_assigned.push_back(cur_host); + resources.insert(std::make_pair(cur_host, std::make_tuple(cores_per_node, ram_per_node))); if (++host_count >= num_nodes) { break; } @@ -141,9 +139,8 @@ namespace wrench { } while (cur_host_idx != round_robin_host_selector_idx); if (resources.size() < num_nodes) { resources = {}; - std::vector::iterator it; - for (it = hosts_assigned.begin(); it != hosts_assigned.end(); it++) { - cs->available_nodes_to_cores[*it] += cores_per_node; + for (auto const &h : hosts_assigned) { + cs->available_nodes_to_cores[h] += cores_per_node; } } else { round_robin_host_selector_idx = cur_host_idx; @@ -169,35 +166,35 @@ namespace wrench { // Set the available time of each node to zero (i.e., now) // (invariant: for each host, core availabilities are sorted by // non-decreasing available time) - std::map> core_available_times; + std::map> core_available_times; for (auto h: cs->nodes_to_cores_map) { - std::string hostname = h.first; + auto host = h.first; unsigned long num_cores = h.second; std::vector zeros; for (unsigned int i = 0; i < num_cores; i++) { zeros.push_back(0); } - core_available_times.insert(std::make_pair(hostname, zeros)); + core_available_times[host] = zeros; } // Update core availabilities for jobs that are currently running - for (auto job: cs->running_jobs) { + for (auto const &job: cs->running_jobs) { auto batch_job = job.second; double time_to_finish = std::max(0, batch_job->getBeginTimestamp() + - batch_job->getRequestedTime() - - cs->simulation->getCurrentSimulatedDate()); + (double)batch_job->getRequestedTime() - + wrench::Simulation::getCurrentSimulatedDate()); for (auto resource: batch_job->getResourcesAllocated()) { - std::string hostname = resource.first; + auto host = resource.first; unsigned long num_cores = std::get<0>(resource.second); double ram = std::get<1>(resource.second); // Update available_times - double new_available_time = *(core_available_times[hostname].begin() + num_cores - 1) + time_to_finish; + double new_available_time = *(core_available_times[host].begin() + num_cores - 1) + time_to_finish; for (unsigned int i = 0; i < num_cores; i++) { - *(core_available_times[hostname].begin() + i) = new_available_time; + *(core_available_times[host].begin() + i) = new_available_time; } // Sort them! - std::sort(core_available_times[hostname].begin(), core_available_times[hostname].end()); + std::sort(core_available_times[host].begin(), core_available_times[host].end()); } } @@ -213,8 +210,8 @@ namespace wrench { #endif // Go through the pending jobs and update core availabilities - for (auto job: this->cs->batch_queue) { - double duration = job->getRequestedTime(); + for (auto const &job: this->cs->batch_queue) { + double duration = (double)job->getRequestedTime(); unsigned long num_hosts = job->getRequestedNumNodes(); unsigned long num_cores_per_host = job->getRequestedCoresPerNode(); @@ -224,7 +221,7 @@ namespace wrench { #endif // Compute the earliest start times on all hosts - std::vector> earliest_start_times; + std::vector> earliest_start_times; for (auto h: core_available_times) { double earliest_start_time = *(h.second.begin() + num_cores_per_host - 1); earliest_start_times.emplace_back(std::make_pair(h.first, earliest_start_time)); @@ -232,7 +229,7 @@ namespace wrench { // Sort the hosts by earliest start times std::sort(earliest_start_times.begin(), earliest_start_times.end(), - [](std::pair const &a, std::pair const &b) { + [](std::pair const &a, std::pair const &b) { return std::get<1>(a) < std::get<1>(b); }); @@ -241,24 +238,24 @@ namespace wrench { // Update the core available times on each host used for the job for (unsigned int i = 0; i < num_hosts; i++) { - std::string hostname = (*(earliest_start_times.begin() + i)).first; + auto host = (*(earliest_start_times.begin() + i)).first; for (unsigned int j = 0; j < num_cores_per_host; j++) { - *(core_available_times[hostname].begin() + j) = earliest_job_start_time + duration; + *(core_available_times[host].begin() + j) = earliest_job_start_time + duration; } - std::sort(core_available_times[hostname].begin(), core_available_times[hostname].end()); + std::sort(core_available_times[host].begin(), core_available_times[host].end()); } // Go through all hosts and make sure that no core is available before earliest_job_start_time // since this is a simple fcfs algorithm with no "jumping ahead" of any kind for (auto h: cs->nodes_to_cores_map) { - std::string hostname = h.first; + auto host = h.first; unsigned long num_cores = h.second; for (unsigned int i = 0; i < num_cores; i++) { - if (*(core_available_times[hostname].begin() + i) < earliest_job_start_time) { - *(core_available_times[hostname].begin() + i) = earliest_job_start_time; + if (*(core_available_times[host].begin() + i) < earliest_job_start_time) { + *(core_available_times[host].begin() + i) = earliest_job_start_time; } } - std::sort(core_available_times[hostname].begin(), core_available_times[hostname].end()); + std::sort(core_available_times[host].begin(), core_available_times[host].end()); } #if 0 @@ -296,7 +293,7 @@ namespace wrench { #endif // Compute the earliest start times on all hosts - std::vector> earliest_start_times; + std::vector> earliest_start_times; for (auto h: core_available_times) { double earliest_start_time = *(h.second.begin() + num_cores_per_host - 1); earliest_start_times.emplace_back(std::make_pair(h.first, earliest_start_time)); @@ -305,7 +302,7 @@ namespace wrench { // Sort the hosts by putative start times std::sort(earliest_start_times.begin(), earliest_start_times.end(), - [](std::pair const &a, std::pair const &b) { + [](std::pair const &a, std::pair const &b) { return std::get<1>(a) < std::get<1>(b); }); @@ -315,7 +312,7 @@ namespace wrench { // Note that below we translate predictions back to actual start dates given the current time if (earliest_job_start_time > 0) { - earliest_job_start_time = cs->simulation->getCurrentSimulatedDate() + earliest_job_start_time; + earliest_job_start_time = wrench::Simulation::getCurrentSimulatedDate() + earliest_job_start_time; } predictions.insert(std::make_pair(id, earliest_job_start_time)); } From 43a464d3c64df0955edc9768b3d54277d56370b1 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 15:35:07 -1000 Subject: [PATCH 3/8] Replacing more std::string with simgrid::s4u::Host* --- src/wrench/services/compute/batch/BatchComputeService.cpp | 6 +++--- .../services/compute/batch/BatschedNetworkListener.cpp | 2 +- .../batch_schedulers/batsched/BatschedBatchScheduler.cpp | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/wrench/services/compute/batch/BatchComputeService.cpp b/src/wrench/services/compute/batch/BatchComputeService.cpp index f5f54bbb42..75eac0a28b 100755 --- a/src/wrench/services/compute/batch/BatchComputeService.cpp +++ b/src/wrench/services/compute/batch/BatchComputeService.cpp @@ -1296,9 +1296,9 @@ namespace wrench { unsigned long time_in_seconds = batch_job->getRequestedTime(); unsigned long cores_per_node_asked_for = batch_job->getRequestedCoresPerNode(); - std::map> resources = {}; - std::vector hosts_assigned = {}; - std::map::iterator it; + std::map> resources = {}; + std::vector hosts_assigned = {}; + std::map::iterator it; for (auto node: node_resources) { double ram_capacity = S4U_Simulation::getHostMemoryCapacity( diff --git a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp index c4c90c43fb..1d40445bba 100755 --- a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp +++ b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp @@ -127,7 +127,7 @@ namespace wrench { zmq::message_t request(strlen(this->data_to_send.c_str())); memcpy(request.data(), this->data_to_send.c_str(), strlen(this->data_to_send.c_str())); - socket.send(request); + socket.send(request, zmq::send_flags::none); // Get the reply. zmq::message_t reply; diff --git a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp index 7145bea809..10bb5e577c 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp @@ -286,7 +286,7 @@ namespace wrench { zmq::message_t request(strlen(data_to_send.c_str())); memcpy(request.data(), data_to_send.c_str(), strlen(data_to_send.c_str())); - socket.send(request); + socket.send(request, zmq::send_flags::none); // Get the reply. zmq::message_t reply; @@ -515,13 +515,12 @@ namespace wrench { int count = 0; for (auto it = this->cs->nodes_to_cores_map.begin(); it != this->cs->nodes_to_cores_map.end(); it++) { compute_resources_map["events"][0]["data"]["resources_data"][count]["id"] = std::to_string(count); - compute_resources_map["events"][0]["data"]["resources_data"][count]["name"] = it->first; + compute_resources_map["events"][0]["data"]["resources_data"][count]["name"] = it->first->get_name(); compute_resources_map["events"][0]["data"]["resources_data"][count]["core"] = it->second; compute_resources_map["events"][0]["data"]["resources_data"][count++]["state"] = "idle"; } std::string data = compute_resources_map.dump(); - try { std::shared_ptr network_listener = std::shared_ptr( From 2a8b7ac017fdffb6e96aefc664d8d70da90cdee0 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 16:07:20 -1000 Subject: [PATCH 4/8] Made it all compile with Batsched Removed ZMQ deprecation warnings --- src/wrench/services/compute/batch/BatschedNetworkListener.cpp | 3 ++- .../batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp index 1d40445bba..6b1060daca 100755 --- a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp +++ b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp @@ -137,7 +137,8 @@ namespace wrench { useconds_t trials; for (trials = 0; trials < max_num_trials; trials++) { usleep(100 + 100 * trials * trials); - int ret = socket.recv(&reply, ZMQ_DONTWAIT); +// int ret = socket.recv(&reply, ZMQ_DONTWAIT); + zmq::recv_result_t ret = socket.recv(reply, zmq::recv_flags::dontwait); if (ret > 0) { break; } diff --git a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp index 10bb5e577c..b9cb2115a7 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp @@ -290,7 +290,7 @@ namespace wrench { // Get the reply. zmq::message_t reply; - socket.recv(&reply); + auto ret = socket.recv(reply, zmq::recv_flags::none); // Process the reply std::string reply_data; From e83c68e5bc7bb0827609db562e86b5a3433e6db3 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 16:25:24 -1000 Subject: [PATCH 5/8] Cleanup --- .../services/compute/batch/BatchComputeService.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/wrench/services/compute/batch/BatchComputeService.cpp b/src/wrench/services/compute/batch/BatchComputeService.cpp index 75eac0a28b..b397f3418d 100755 --- a/src/wrench/services/compute/batch/BatchComputeService.cpp +++ b/src/wrench/services/compute/batch/BatchComputeService.cpp @@ -41,7 +41,7 @@ WRENCH_LOG_CATEGORY(wrench_core_batch_service, "Log category for Batch Service") namespace wrench { // Do not remove - BatchComputeService::~BatchComputeService() {} + BatchComputeService::~BatchComputeService() = default; /** * @brief Constructor @@ -108,25 +108,25 @@ namespace wrench { // Check Platform homogeneity auto first_host = *(this->compute_hosts.begin()); - double num_cores_available = (double) (first_host->get_core_count()); + auto num_cores_available = (first_host->get_core_count()); double speed = first_host->get_speed(); double ram_available = S4U_Simulation::getHostMemoryCapacity(first_host); - for (auto const &h: compute_hosts) { + for (auto const &h: this->compute_hosts) { // Compute speed - if (std::abs(speed - Simulation::getHostFlopRate(h)) > DBL_EPSILON) { + if (std::abs(speed - h->get_speed()) > DBL_EPSILON) { throw std::invalid_argument( "BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need " "to be homogeneous (different flop rates detected)"); } // RAM - if (std::abs(ram_available - Simulation::getHostMemoryCapacity(h)) > DBL_EPSILON) { + if (std::abs(ram_available - S4U_Simulation::getHostMemoryCapacity(h)) > DBL_EPSILON) { throw std::invalid_argument( "BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need " "to be homogeneous (different RAM capacities detected)"); } // Num cores - if ((double) (Simulation::getHostNumCores(h)) != num_cores_available) { + if ((double)(h->get_core_count()) != num_cores_available) { throw std::invalid_argument( "BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need " "to be homogeneous (different RAM capacities detected)"); From ec759de57477bce2f1500f6ba0ac2563ef592ce9 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Oct 2023 20:28:32 -1000 Subject: [PATCH 6/8] Fixing round-robin batch algorithm --- .../compute/batch/BatchComputeService.h | 4 ++-- .../compute/batch/BatchComputeService.cpp | 4 ++++ .../homegrown/fcfs/FCFSBatchScheduler.cpp | 5 ++++- .../BatchServiceTest.cpp | 22 +++++++++++-------- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/include/wrench/services/compute/batch/BatchComputeService.h b/include/wrench/services/compute/batch/BatchComputeService.h index 2845d03884..4b948f04dd 100755 --- a/include/wrench/services/compute/batch/BatchComputeService.h +++ b/include/wrench/services/compute/batch/BatchComputeService.h @@ -172,10 +172,10 @@ namespace wrench { /* Resources information in batch */ unsigned long total_num_of_nodes; unsigned long num_cores_per_node; - std::unordered_map nodes_to_cores_map; + std::map nodes_to_cores_map; std::vector timeslots; std::map available_nodes_to_cores; - std::unordered_map host_id_to_names; + std::map host_id_to_names; std::vector compute_hosts; /* End Resources information in batch */ diff --git a/src/wrench/services/compute/batch/BatchComputeService.cpp b/src/wrench/services/compute/batch/BatchComputeService.cpp index b397f3418d..8e864b12b2 100755 --- a/src/wrench/services/compute/batch/BatchComputeService.cpp +++ b/src/wrench/services/compute/batch/BatchComputeService.cpp @@ -106,6 +106,10 @@ namespace wrench { this->compute_hosts.push_back(S4U_Simulation::get_host_or_vm_by_name(h)); } +// for (const auto &h : this->compute_hosts) { +// std::cerr << "==> " << h->get_name() << "\n"; +// } + // Check Platform homogeneity auto first_host = *(this->compute_hosts.begin()); auto num_cores_available = (first_host->get_core_count()); diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp index 93d7657330..68ee653964 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp @@ -39,6 +39,9 @@ namespace wrench { */ std::map> FCFSBatchScheduler::scheduleOnHosts( unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) { + + std::cerr << "IN SCHEDULE ON HOST\n"; + if (ram_per_node == ComputeService::ALL_RAM) { ram_per_node = S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); } @@ -119,7 +122,7 @@ namespace wrench { resources.insert(std::make_pair(target_host, std::make_tuple(cores_per_node, ComputeService::ALL_RAM))); } } else if (host_selection_algorithm == "ROUNDROBIN") { - static unsigned long round_robin_host_selector_idx = 0; + static unsigned long round_robin_host_selector_idx = -1; unsigned long cur_host_idx = round_robin_host_selector_idx; unsigned long host_count = 0; do { diff --git a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp index b79be6779a..fb3f7ce384 100755 --- a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp +++ b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp @@ -2294,12 +2294,12 @@ class RoundRobinStandardJobSubmissionTestWMS : public wrench::ExecutionControlle auto job4 = job_manager->createStandardJob(task4); - std::map batch_job_args; - batch_job_args["-N"] = "2"; - batch_job_args["-t"] = "60";//time in seconds - batch_job_args["-c"] = "2"; //number of cores per node + std::map task1_batch_job_args; + task1_batch_job_args["-N"] = "2"; + task1_batch_job_args["-t"] = "60";//time in seconds + task1_batch_job_args["-c"] = "2"; //number of cores per node try { - job_manager->submitJob(job, this->test->compute_service, batch_job_args); + job_manager->submitJob(job, this->test->compute_service, task1_batch_job_args); } catch (wrench::ExecutionException &e) { throw std::runtime_error( "Exception: " + std::string(e.what())); @@ -2338,7 +2338,7 @@ class RoundRobinStandardJobSubmissionTestWMS : public wrench::ExecutionControlle "Exception: " + std::string(e.what())); } - //wait for two standard job completion events + //wait for standard job completion events int num_events = 0; while (num_events < 4) { std::shared_ptr event; @@ -2371,7 +2371,11 @@ class RoundRobinStandardJobSubmissionTestWMS : public wrench::ExecutionControlle //however let's check further if the task1 hostname is equal to the task4 hostname if (task1->getExecutionHost() != task4->getExecutionHost()) { throw std::runtime_error( - "BatchServiceTest::ROUNDROBINTEST():: The tasks did not execute on the right hosts"); + "BatchServiceTest::ROUNDROBINTEST():: The tasks did not execute on the right hosts: " + + task1->getExecutionHost() + "-" + + task2->getExecutionHost() + "-" + + task3->getExecutionHost() + "-" + + task4->getExecutionHost()); } } @@ -2444,10 +2448,10 @@ TEST_F(BatchServiceTest, RoundRobinTaskTest) void BatchServiceTest::do_RoundRobinTask_test() { // Create and initialize a simulation auto simulation = wrench::Simulation::createSimulation(); - int argc = 1; + int argc = 2; char **argv = (char **) calloc(argc, sizeof(char *)); argv[0] = strdup("unit_test"); - // argv[1] = strdup("--wrench-full-log"); + argv[1] = strdup("--wrench-full-log"); ASSERT_NO_THROW(simulation->init(&argc, argv)); From 558b010c38deb7b6a1c9c3c11e7e0a29d94eb87f Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 6 Oct 2023 07:45:35 +0000 Subject: [PATCH 7/8] Committing clang-format changes --- .../services/compute/batch/BatchComputeService.cpp | 14 +++++++------- .../compute/batch/BatschedNetworkListener.cpp | 2 +- .../ConservativeBackfillingBatchScheduler.cpp | 2 +- ...servativeBackfillingBatchSchedulerCoreLevel.cpp | 2 +- .../homegrown/fcfs/FCFSBatchScheduler.cpp | 12 ++++++------ .../BatchServiceTest.cpp | 2 +- 6 files changed, 17 insertions(+), 17 deletions(-) mode change 100755 => 100644 src/wrench/services/compute/batch/BatchComputeService.cpp mode change 100755 => 100644 src/wrench/services/compute/batch/BatschedNetworkListener.cpp mode change 100755 => 100644 src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp mode change 100755 => 100644 src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp mode change 100755 => 100644 src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp mode change 100755 => 100644 test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp diff --git a/src/wrench/services/compute/batch/BatchComputeService.cpp b/src/wrench/services/compute/batch/BatchComputeService.cpp old mode 100755 new mode 100644 index 8e864b12b2..37ec5ca903 --- a/src/wrench/services/compute/batch/BatchComputeService.cpp +++ b/src/wrench/services/compute/batch/BatchComputeService.cpp @@ -102,13 +102,13 @@ namespace wrench { } // Get the hosts - for (const auto &h : compute_hosts) { + for (const auto &h: compute_hosts) { this->compute_hosts.push_back(S4U_Simulation::get_host_or_vm_by_name(h)); } -// for (const auto &h : this->compute_hosts) { -// std::cerr << "==> " << h->get_name() << "\n"; -// } + // for (const auto &h : this->compute_hosts) { + // std::cerr << "==> " << h->get_name() << "\n"; + // } // Check Platform homogeneity auto first_host = *(this->compute_hosts.begin()); @@ -130,7 +130,7 @@ namespace wrench { "to be homogeneous (different RAM capacities detected)"); } // Num cores - if ((double)(h->get_core_count()) != num_cores_available) { + if ((double) (h->get_core_count()) != num_cores_available) { throw std::invalid_argument( "BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need " "to be homogeneous (different RAM capacities detected)"); @@ -994,7 +994,7 @@ namespace wrench { compound_job->pushCallbackMailbox(this->mailbox); std::map> resources_by_hostname; - for (auto const &h : resources) { + for (auto const &h: resources) { resources_by_hostname[h.first->get_name()] = h.second; } auto executor = std::shared_ptr( @@ -1071,7 +1071,7 @@ namespace wrench { } else if (key == "ram_availabilities") { // RAM availability per host (0 if something is running, full otherwise) for (const auto &h: this->available_nodes_to_cores) { - if ((double)h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) { + if ((double) h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) { dict.insert(std::make_pair(h.first->get_name(), 0.0)); } else { dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first))); diff --git a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp old mode 100755 new mode 100644 index 6b1060daca..3d7ab37f31 --- a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp +++ b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp @@ -137,7 +137,7 @@ namespace wrench { useconds_t trials; for (trials = 0; trials < max_num_trials; trials++) { usleep(100 + 100 * trials * trials); -// int ret = socket.recv(&reply, ZMQ_DONTWAIT); + // int ret = socket.recv(&reply, ZMQ_DONTWAIT); zmq::recv_result_t ret = socket.recv(reply, zmq::recv_flags::dontwait); if (ret > 0) { break; diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp old mode 100755 new mode 100644 index ff72eebd3c..2ba44078a9 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/ConservativeBackfillingBatchScheduler.cpp @@ -270,7 +270,7 @@ namespace wrench { if (resources.size() < num_nodes) { resources = {}; // undo! - for (auto const &h : hosts_assigned) { + for (auto const &h: hosts_assigned) { cs->available_nodes_to_cores[h] += cores_per_node; } } diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp old mode 100755 new mode 100644 index b6ed4faee9..976a236a2f --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf_core_level/ConservativeBackfillingBatchSchedulerCoreLevel.cpp @@ -254,7 +254,7 @@ namespace wrench { if (num_nodes > cs->available_nodes_to_cores.size()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many hosts"); } - if (cores_per_node >cs->available_nodes_to_cores.begin()->first->get_core_count()) { + if (cores_per_node > cs->available_nodes_to_cores.begin()->first->get_core_count()) { throw std::runtime_error("CONSERVATIVE_BFBatchScheduler::findNextJobToSchedule(): Asking for too many cores per host (asking for " + std::to_string(cores_per_node) + " but hosts have " + std::to_string(cs->available_nodes_to_cores.begin()->first->get_core_count()) + "cores)"); diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp old mode 100755 new mode 100644 index 68ee653964..cf904b8169 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp @@ -80,7 +80,7 @@ namespace wrench { } if (resources.size() < num_nodes) { resources = {}; - for (auto const &h : hosts_assigned) { + for (auto const &h: hosts_assigned) { cs->available_nodes_to_cores[h] += cores_per_node; } } @@ -112,7 +112,7 @@ namespace wrench { if (target_host == nullptr) { WRENCH_INFO("Didn't find a suitable host"); resources = {}; - for (auto const &h : hosts_assigned) { + for (auto const &h: hosts_assigned) { cs->available_nodes_to_cores[h] += cores_per_node; } break; @@ -142,7 +142,7 @@ namespace wrench { } while (cur_host_idx != round_robin_host_selector_idx); if (resources.size() < num_nodes) { resources = {}; - for (auto const &h : hosts_assigned) { + for (auto const &h: hosts_assigned) { cs->available_nodes_to_cores[h] += cores_per_node; } } else { @@ -169,7 +169,7 @@ namespace wrench { // Set the available time of each node to zero (i.e., now) // (invariant: for each host, core availabilities are sorted by // non-decreasing available time) - std::map> core_available_times; + std::map> core_available_times; for (auto h: cs->nodes_to_cores_map) { auto host = h.first; unsigned long num_cores = h.second; @@ -185,7 +185,7 @@ namespace wrench { for (auto const &job: cs->running_jobs) { auto batch_job = job.second; double time_to_finish = std::max(0, batch_job->getBeginTimestamp() + - (double)batch_job->getRequestedTime() - + (double) batch_job->getRequestedTime() - wrench::Simulation::getCurrentSimulatedDate()); for (auto resource: batch_job->getResourcesAllocated()) { auto host = resource.first; @@ -214,7 +214,7 @@ namespace wrench { // Go through the pending jobs and update core availabilities for (auto const &job: this->cs->batch_queue) { - double duration = (double)job->getRequestedTime(); + double duration = (double) job->getRequestedTime(); unsigned long num_hosts = job->getRequestedNumNodes(); unsigned long num_cores_per_host = job->getRequestedCoresPerNode(); diff --git a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp old mode 100755 new mode 100644 index fb3f7ce384..ce99d83141 --- a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp +++ b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp @@ -2451,7 +2451,7 @@ void BatchServiceTest::do_RoundRobinTask_test() { int argc = 2; char **argv = (char **) calloc(argc, sizeof(char *)); argv[0] = strdup("unit_test"); - argv[1] = strdup("--wrench-full-log"); + argv[1] = strdup("--wrench-full-log"); ASSERT_NO_THROW(simulation->init(&argc, argv)); From d55dc5827379cc6a8ac14974a75144a9e6db2c61 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 6 Oct 2023 07:56:11 -1000 Subject: [PATCH 8/8] debug output -- --- .../batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp | 2 -- .../batch_standard_and_pilot_jobs/BatchServiceTest.cpp | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp index 68ee653964..0af1e6d314 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp @@ -40,8 +40,6 @@ namespace wrench { std::map> FCFSBatchScheduler::scheduleOnHosts( unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) { - std::cerr << "IN SCHEDULE ON HOST\n"; - if (ram_per_node == ComputeService::ALL_RAM) { ram_per_node = S4U_Simulation::getHostMemoryCapacity(cs->available_nodes_to_cores.begin()->first); } diff --git a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp index fb3f7ce384..762a607fee 100755 --- a/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp +++ b/test/services/compute_services/batch_standard_and_pilot_jobs/BatchServiceTest.cpp @@ -2448,10 +2448,10 @@ TEST_F(BatchServiceTest, RoundRobinTaskTest) void BatchServiceTest::do_RoundRobinTask_test() { // Create and initialize a simulation auto simulation = wrench::Simulation::createSimulation(); - int argc = 2; + int argc = 1; char **argv = (char **) calloc(argc, sizeof(char *)); argv[0] = strdup("unit_test"); - argv[1] = strdup("--wrench-full-log"); +// argv[1] = strdup("--wrench-full-log"); ASSERT_NO_THROW(simulation->init(&argc, argv));