Skip to content

Commit

Permalink
Merge branch 'master' into simgrid_master
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Oct 6, 2023
2 parents e007c55 + 6c4a992 commit c735499
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 138 deletions.
13 changes: 7 additions & 6 deletions include/wrench/services/compute/batch/BatchComputeService.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ namespace wrench {
/* Resources information in batch */
unsigned long total_num_of_nodes;
unsigned long num_cores_per_node;
std::unordered_map<std::string, unsigned long> nodes_to_cores_map;
std::map<simgrid::s4u::Host *, unsigned long> nodes_to_cores_map;
std::vector<double> timeslots;
std::map<std::string, unsigned long> available_nodes_to_cores;
std::unordered_map<unsigned long, std::string> host_id_to_names;
std::vector<std::string> compute_hosts;
std::map<simgrid::s4u::Host *, unsigned long> available_nodes_to_cores;
std::map<unsigned long, simgrid::s4u::Host *> host_id_to_names;
std::vector<simgrid::s4u::Host *> compute_hosts;

/* End Resources information in batch */

// Vector of one-shot bare-metal compute services
Expand Down Expand Up @@ -262,7 +263,7 @@ namespace wrench {
void processAlarmJobTimeout(const std::shared_ptr<BatchJob> &batch_job);

//free up resources
void freeUpResources(const std::map<std::string, std::tuple<unsigned long, double>> &resources);
void freeUpResources(const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &resources);

//send call back to the pilot job submitters
void sendPilotJobExpirationNotification(const std::shared_ptr<PilotJob> &job);
Expand All @@ -274,7 +275,7 @@ namespace wrench {
void processJobSubmission(const std::shared_ptr<BatchJob> &job, simgrid::s4u::Mailbox *answer_mailbox);

//start a job
void startJob(const std::map<std::string, std::tuple<unsigned long, double>> &, const std::shared_ptr<CompoundJob> &,
void startJob(const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &, const std::shared_ptr<CompoundJob> &,
const std::shared_ptr<BatchJob> &, unsigned long, unsigned long, unsigned long);

// process a resource request
Expand Down
6 changes: 3 additions & 3 deletions include/wrench/services/compute/batch/BatchJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ namespace wrench {
unsigned long getRequestedNumNodes() const;
std::shared_ptr<CompoundJob> getCompoundJob();
void setEndingTimestamp(double time_stamp);
std::map<std::string, std::tuple<unsigned long, double>> getResourcesAllocated();
void setAllocatedResources(const std::map<std::string, std::tuple<unsigned long, double>> &resources);
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> getResourcesAllocated();
void setAllocatedResources(const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &resources);

/**
* @brief Set the indices of the allocated nodes
Expand Down Expand Up @@ -68,7 +68,7 @@ namespace wrench {
double begin_time_stamp;
double ending_time_stamp;
double arrival_time_stamp;
std::map<std::string, std::tuple<unsigned long, double>> resources_allocated;
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> resources_allocated;

std::vector<int> allocated_node_indices;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace wrench {
* @param ram_per_node: amount of RAM
* @return a host:<core,RAM> map
*/
virtual std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) = 0;
virtual std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long num_nodes, unsigned long cores_per_node, double ram_per_node) = 0;
};

/***********************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace wrench {

void compactSchedule();

std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;

std::map<std::string, double>
getStartTimeEstimates(std::set<std::tuple<std::string, unsigned long, unsigned long, double>> set_of_jobs) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace wrench {

void compactSchedule();

std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;

std::map<std::string, double>
getStartTimeEstimates(std::set<std::tuple<std::string, unsigned long, unsigned long, double>> set_of_jobs) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace wrench {

std::shared_ptr<BatchJob> pickNextJobToSchedule();

std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> scheduleOnHosts(unsigned long, unsigned long, double) override;

std::map<std::string, double> getStartTimeEstimates(std::set<std::tuple<std::string, unsigned long, unsigned long, double>> set_of_jobs) override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ namespace wrench {
scratch_space_mount_point) {
std::map<std::string, std::tuple<unsigned long, double>> specified_compute_resources;
for (const auto &h: compute_hosts) {
specified_compute_resources.insert(
std::make_pair(h, std::make_tuple(ComputeService::ALL_CORES, ComputeService::ALL_RAM)));
specified_compute_resources[h] = std::make_tuple(ComputeService::ALL_CORES, ComputeService::ALL_RAM);
}

initiateInstance(hostname,
Expand Down
73 changes: 44 additions & 29 deletions src/wrench/services/compute/batch/BatchComputeService.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ WRENCH_LOG_CATEGORY(wrench_core_batch_service, "Log category for Batch Service")

namespace wrench {
// Do not remove
BatchComputeService::~BatchComputeService() {}
BatchComputeService::~BatchComputeService() = default;

/**
* @brief Constructor
Expand Down Expand Up @@ -101,40 +101,50 @@ namespace wrench {
"BatchComputeService::BatchComputeService(): at least one compute hosts must be provided");
}

// Get the hosts
for (const auto &h: compute_hosts) {
this->compute_hosts.push_back(S4U_Simulation::get_host_or_vm_by_name(h));
}

// for (const auto &h : this->compute_hosts) {
// std::cerr << "==> " << h->get_name() << "\n";
// }

// Check Platform homogeneity
double num_cores_available = (double) (Simulation::getHostNumCores(*(compute_hosts.begin())));
double speed = Simulation::getHostFlopRate(*(compute_hosts.begin()));
double ram_available = Simulation::getHostMemoryCapacity(*(compute_hosts.begin()));
auto first_host = *(this->compute_hosts.begin());
auto num_cores_available = (first_host->get_core_count());
double speed = first_host->get_speed();
double ram_available = S4U_Simulation::getHostMemoryCapacity(first_host);

for (auto const &h: compute_hosts) {
for (auto const &h: this->compute_hosts) {
// Compute speed
if (std::abs(speed - Simulation::getHostFlopRate(h)) > DBL_EPSILON) {
if (std::abs(speed - h->get_speed()) > DBL_EPSILON) {
throw std::invalid_argument(
"BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need "
"to be homogeneous (different flop rates detected)");
}
// RAM
if (std::abs(ram_available - Simulation::getHostMemoryCapacity(h)) > DBL_EPSILON) {
if (std::abs(ram_available - S4U_Simulation::getHostMemoryCapacity(h)) > DBL_EPSILON) {
throw std::invalid_argument(
"BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need "
"to be homogeneous (different RAM capacities detected)");
}
// Num cores
if ((double) (Simulation::getHostNumCores(h)) != num_cores_available) {
if ((double) (h->get_core_count()) != num_cores_available) {
throw std::invalid_argument(
"BatchComputeService::BatchComputeService(): Compute hosts for a BatchComputeService service need "
"to be homogeneous (different RAM capacities detected)");
}
}


//create a map for host to cores
int i = 0;
for (const auto &h: compute_hosts) {
this->nodes_to_cores_map.insert({h, num_cores_available});
this->available_nodes_to_cores.insert({h, num_cores_available});
for (const auto &h: this->compute_hosts) {
this->nodes_to_cores_map[h] = num_cores_available;
this->available_nodes_to_cores[h] = num_cores_available;
this->host_id_to_names[i++] = h;
}
this->compute_hosts = compute_hosts;

this->num_cores_per_node = this->nodes_to_cores_map.begin()->second;
this->total_num_of_nodes = compute_hosts.size();
Expand Down Expand Up @@ -480,11 +490,11 @@ namespace wrench {
/**
* @brief Increase resource availabilities based on freed resources
* @param resources: a set of tuples as follows:
* - hostname (string)
* - host
* - number of cores (unsigned long)
* - bytes of RAM (double)
*/
void BatchComputeService::freeUpResources(const std::map<std::string, std::tuple<unsigned long, double>> &resources) {
void BatchComputeService::freeUpResources(const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &resources) {
for (auto r: resources) {
this->available_nodes_to_cores[r.first] += std::get<0>(r.second);
}
Expand Down Expand Up @@ -800,9 +810,9 @@ namespace wrench {

if ((requested_hosts > this->available_nodes_to_cores.size()) or
(requested_num_cores_per_host >
Simulation::getHostNumCores(this->available_nodes_to_cores.begin()->first)) or
this->available_nodes_to_cores.begin()->first->get_core_count()) or
(required_ram_per_host >
Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first))) {
S4U_Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first))) {
{
S4U_Mailbox::dputMessage(
answer_mailbox,
Expand Down Expand Up @@ -972,7 +982,7 @@ namespace wrench {
* @param cores_per_node_asked_for
*/
void BatchComputeService::startJob(
const std::map<std::string, std::tuple<unsigned long, double>> &resources,
const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &resources,
const std::shared_ptr<CompoundJob> &compound_job,
const std::shared_ptr<BatchJob> &batch_job, unsigned long num_nodes_allocated,
unsigned long allocated_time,
Expand All @@ -982,11 +992,16 @@ namespace wrench {
num_nodes_allocated, cores_per_node_asked_for);

compound_job->pushCallbackMailbox(this->mailbox);

std::map<std::string, std::tuple<unsigned long, double>> resources_by_hostname;
for (auto const &h: resources) {
resources_by_hostname[h.first->get_name()] = h.second;
}
auto executor = std::shared_ptr<BareMetalComputeServiceOneShot>(
new BareMetalComputeServiceOneShot(
compound_job,
this->hostname,
resources,
resources_by_hostname,
{{BareMetalComputeServiceProperty::THREAD_STARTUP_OVERHEAD,
this->getPropertyValueAsString(
BatchComputeServiceProperty::THREAD_STARTUP_OVERHEAD)}},
Expand Down Expand Up @@ -1032,34 +1047,34 @@ namespace wrench {

} else if (key == "num_cores") {
for (const auto &h: this->nodes_to_cores_map) {
dict.insert(std::make_pair(h.first, (double) (h.second)));
dict.insert(std::make_pair(h.first->get_name(), (double) (h.second)));
}

} else if (key == "num_idle_cores") {
// Num idle cores per hosts
for (const auto &h: this->available_nodes_to_cores) {
dict.insert(std::make_pair(h.first, (double) (h.second)));
dict.insert(std::make_pair(h.first->get_name(), (double) (h.second)));
}

} else if (key == "flop_rates") {
// Flop rate per host
for (const auto &h: this->nodes_to_cores_map) {
dict.insert(std::make_pair(h.first, S4U_Simulation::getHostFlopRate(h.first)));
dict.insert(std::make_pair(h.first->get_name(), h.first->get_speed()));
}

} else if (key == "ram_capacities") {
// RAM capacity per host
for (const auto &h: this->nodes_to_cores_map) {
dict.insert(std::make_pair(h.first, S4U_Simulation::getHostMemoryCapacity(h.first)));
dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first)));
}

} else if (key == "ram_availabilities") {
// RAM availability per host (0 if something is running, full otherwise)
for (const auto &h: this->available_nodes_to_cores) {
if (h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) {
dict.insert(std::make_pair(h.first, 0.0));
if ((double) h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) {
dict.insert(std::make_pair(h.first->get_name(), 0.0));
} else {
dict.insert(std::make_pair(h.first, S4U_Simulation::getHostMemoryCapacity(h.first)));
dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first)));
}
}
} else {
Expand Down Expand Up @@ -1285,9 +1300,9 @@ namespace wrench {
unsigned long time_in_seconds = batch_job->getRequestedTime();
unsigned long cores_per_node_asked_for = batch_job->getRequestedCoresPerNode();

std::map<std::string, std::tuple<unsigned long, double>> resources = {};
std::vector<std::string> hosts_assigned = {};
std::map<std::string, unsigned long>::iterator it;
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> resources = {};
std::vector<simgrid::s4u::Host *> hosts_assigned = {};
std::map<simgrid::s4u::Host *, unsigned long>::iterator it;

for (auto node: node_resources) {
double ram_capacity = S4U_Simulation::getHostMemoryCapacity(
Expand Down Expand Up @@ -1388,7 +1403,7 @@ namespace wrench {
}

// Double check that memory requirements of all tasks can be met
if (job->getMinimumRequiredMemory() > Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first)) {
if (job->getMinimumRequiredMemory() > S4U_Simulation::getHostMemoryCapacity(this->available_nodes_to_cores.begin()->first)) {
throw ExecutionException(std::make_shared<NotEnoughResources>(job, this->getSharedPtr<ComputeService>()));
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/wrench/services/compute/batch/BatchJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ namespace wrench {
* @brief Get the resources allocated to this BatchComputeService job
* @return a list of resource, each as a <hostname, number of cores, bytes of RAM> tuple
*/
std::map<std::string, std::tuple<unsigned long, double>> BatchJob::getResourcesAllocated() {
std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> BatchJob::getResourcesAllocated() {
return this->resources_allocated;
}

/**
* @brief Set the resources allocated to this BatchComputeService job
* @param resources: a list of resource, each as a <hostname, number of cores, bytes of RAM> tuple
* @param resources: a list of resource, each as a <host, number of cores, bytes of RAM> tuple
*/
void BatchJob::setAllocatedResources(const std::map<std::string, std::tuple<unsigned long, double>> &resources) {
void BatchJob::setAllocatedResources(const std::map<simgrid::s4u::Host *, std::tuple<unsigned long, double>> &resources) {
if (resources.empty()) {
throw std::invalid_argument(
"BatchJob::setAllocatedResources(): Empty Resources allocated");
Expand Down
5 changes: 3 additions & 2 deletions src/wrench/services/compute/batch/BatschedNetworkListener.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ namespace wrench {

zmq::message_t request(strlen(this->data_to_send.c_str()));
memcpy(request.data(), this->data_to_send.c_str(), strlen(this->data_to_send.c_str()));
socket.send(request);
socket.send(request, zmq::send_flags::none);

// Get the reply.
zmq::message_t reply;
Expand All @@ -137,7 +137,8 @@ namespace wrench {
useconds_t trials;
for (trials = 0; trials < max_num_trials; trials++) {
usleep(100 + 100 * trials * trials);
int ret = socket.recv(&reply, ZMQ_DONTWAIT);
// int ret = socket.recv(&reply, ZMQ_DONTWAIT);
zmq::recv_result_t ret = socket.recv(reply, zmq::recv_flags::dontwait);
if (ret > 0) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,11 @@ namespace wrench {

zmq::message_t request(strlen(data_to_send.c_str()));
memcpy(request.data(), data_to_send.c_str(), strlen(data_to_send.c_str()));
socket.send(request);
socket.send(request, zmq::send_flags::none);

// Get the reply.
zmq::message_t reply;
socket.recv(&reply);
auto ret = socket.recv(reply, zmq::recv_flags::none);

// Process the reply
std::string reply_data;
Expand Down Expand Up @@ -515,13 +515,12 @@ namespace wrench {
int count = 0;
for (auto it = this->cs->nodes_to_cores_map.begin(); it != this->cs->nodes_to_cores_map.end(); it++) {
compute_resources_map["events"][0]["data"]["resources_data"][count]["id"] = std::to_string(count);
compute_resources_map["events"][0]["data"]["resources_data"][count]["name"] = it->first;
compute_resources_map["events"][0]["data"]["resources_data"][count]["name"] = it->first->get_name();
compute_resources_map["events"][0]["data"]["resources_data"][count]["core"] = it->second;
compute_resources_map["events"][0]["data"]["resources_data"][count++]["state"] = "idle";
}
std::string data = compute_resources_map.dump();


try {
std::shared_ptr<BatschedNetworkListener> network_listener =
std::shared_ptr<BatschedNetworkListener>(
Expand Down
Loading

0 comments on commit c735499

Please sign in to comment.