Skip to content

Commit

Permalink
Merge branch 'master' of github.com:wrench-project/wrench
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Oct 13, 2023
2 parents 052b9c6 + ca77fe4 commit 2f5cb4f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ namespace wrench {
WRENCH_WARN("LogicalFileSystem::reserveSpace(): Space was already being reserved for storing file %s at path %s:%s. "
"This is likely a redundant copy, and nothing needs to be done",
file->getID().c_str(), this->hostname.c_str(), fixed_path.c_str());
return true;
return true;
}

if (this->free_space < file->getSize()) {
Expand Down
1 change: 0 additions & 1 deletion tools/wrench/wrench-daemon/include/SimulationController.h
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ namespace wrench {
json getVMComputeService(json data);

private:

template<class T>
json startService(T *s);

Expand Down
199 changes: 99 additions & 100 deletions tools/wrench/wrench-daemon/src/SimulationController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@

WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController");

#define PARSE_SERVICE_PROPERTY_LIST() WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \
{ \
json jsonData = json::parse(property_list_string); \
for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \
auto property_key = ServiceProperty::translateString(it.key()); \
service_property_list[property_key] = it.value(); \
} \
}

#define PARSE_MESSAGE_PAYLOAD_LIST() WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \
{ \
json jsonData = json::parse(message_payload_list_string); \
for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \
auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \
service_message_payload_list[message_payload_key] = it.value(); \
} \
}
#define PARSE_SERVICE_PROPERTY_LIST() \
WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \
{ \
json jsonData = json::parse(property_list_string); \
for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \
auto property_key = ServiceProperty::translateString(it.key()); \
service_property_list[property_key] = it.value(); \
} \
}

#define PARSE_MESSAGE_PAYLOAD_LIST() \
WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \
{ \
json jsonData = json::parse(message_payload_list_string); \
for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \
auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \
service_message_payload_list[message_payload_key] = it.value(); \
} \
}

namespace wrench {

Expand All @@ -50,11 +52,11 @@ namespace wrench {
const std::string &hostname, int sleep_us) : ExecutionController(hostname, "SimulationController"), workflow(workflow), sleep_us(sleep_us) {}


template <class T>
template<class T>
json SimulationController::startService(T *s) {
BlockingQueue<std::pair<bool, std::string>> s_created;

this->things_to_do.push([this, s, &s_created](){
this->things_to_do.push([this, s, &s_created]() {
try {
auto new_service_shared_ptr = this->simulation->startNewService(s);
if (auto cs = std::dynamic_pointer_cast<wrench::ComputeService>(new_service_shared_ptr)) {
Expand Down Expand Up @@ -106,10 +108,10 @@ namespace wrench {

// Main control loop
while (keep_going) {

// Starting compute and storage services that should be started, if any
while (true) {
std::function<void()> thing_to_do;
std::function<void()> thing_to_do;

if (this->things_to_do.tryPop(thing_to_do)) {
thing_to_do();
Expand All @@ -128,7 +130,7 @@ namespace wrench {
// Moves time forward if needed (because the client has done a sleep),
// And then add all events that occurred to the event queue
double time_to_sleep = std::max<double>(0, time_horizon_to_reach -
wrench::Simulation::getCurrentSimulatedDate());
wrench::Simulation::getCurrentSimulatedDate());
if (time_to_sleep > 0.0) {
WRENCH_INFO("Sleeping %.2lf seconds", time_to_sleep);
S4U_Simulation::sleep(time_to_sleep);
Expand Down Expand Up @@ -333,7 +335,6 @@ namespace wrench {
auto new_service = new CloudComputeService(hostname, resources, scratch_space,
service_property_list, service_message_payload_list);
return this->startService<wrench::ComputeService>(new_service);

}


Expand All @@ -357,7 +358,6 @@ namespace wrench {
auto new_service = new BatchComputeService(hostname, resources, scratch_space,
service_property_list, service_message_payload_list);
return this->startService<wrench::ComputeService>(new_service);

}

/**
Expand Down Expand Up @@ -386,15 +386,15 @@ namespace wrench {
BlockingQueue<std::pair<bool, std::string>> vm_created;

// Push the request into the blocking queue (will be a single one!)
this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
std::string vm_name;
try {
vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list);
vm_created.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_created.push(std::pair(false, e.getCause()->toString()));
}
this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
std::string vm_name;
try {
vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list);
vm_created.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_created.push(std::pair(false, e.getCause()->toString()));
}
});

// Poll from the shared queue (will be a single one!)
Expand Down Expand Up @@ -428,20 +428,20 @@ namespace wrench {

BlockingQueue<std::pair<bool, std::string>> vm_started;
// Push the request into the blocking queue (will be a single one!)
this->things_to_do.push([this, vm_name, cs, &vm_started](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot start VM because it's not down");
}
auto bm_cs = cloud_cs->startVM(vm_name);
this->compute_service_registry.insert(bm_cs->getName(), bm_cs);
vm_started.push(std::pair(true, bm_cs->getName()));
} catch (ExecutionException &e) {
vm_started.push(std::pair(false, e.getCause()->toString()));
} catch (std::invalid_argument &e) {
vm_started.push(std::pair(false, e.what()));
}
this->things_to_do.push([this, vm_name, cs, &vm_started]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot start VM because it's not down");
}
auto bm_cs = cloud_cs->startVM(vm_name);
this->compute_service_registry.insert(bm_cs->getName(), bm_cs);
vm_started.push(std::pair(true, bm_cs->getName()));
} catch (ExecutionException &e) {
vm_started.push(std::pair(false, e.getCause()->toString()));
} catch (std::invalid_argument &e) {
vm_started.push(std::pair(false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand Down Expand Up @@ -477,22 +477,22 @@ namespace wrench {

// Push the request into the blocking queue (will be a single one!)
//this->vm_to_shutdown.push(std::pair(vm_name, cs));
this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMRunning(vm_name)) {
throw std::invalid_argument("Cannot shutdown VM because it's not running");
}
auto bm_cs = cloud_cs->getVMComputeService(vm_name);

this->compute_service_registry.remove(bm_cs->getName());
cloud_cs->shutdownVM(vm_name);
vm_shutdown.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_shutdown.push(std::pair(false, e.what()));
} catch (std::invalid_argument &e) {
vm_shutdown.push(std::pair(false, e.what()));
}
this->things_to_do.push([this, vm_name, cs, &vm_shutdown]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMRunning(vm_name)) {
throw std::invalid_argument("Cannot shutdown VM because it's not running");
}
auto bm_cs = cloud_cs->getVMComputeService(vm_name);

this->compute_service_registry.remove(bm_cs->getName());
cloud_cs->shutdownVM(vm_name);
vm_shutdown.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_shutdown.push(std::pair(false, e.what()));
} catch (std::invalid_argument &e) {
vm_shutdown.push(std::pair(false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand Down Expand Up @@ -525,17 +525,17 @@ namespace wrench {
BlockingQueue<std::pair<bool, std::string>> vm_destroyed;

// Push the request into the blocking queue (will be a single one!)
this->things_to_do.push([vm_name, cs, &vm_destroyed](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot destroy VM because it's not down");
}
cloud_cs->destroyVM(vm_name);
vm_destroyed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_destroyed.push(std::pair(false, e.what()));
}
this->things_to_do.push([vm_name, cs, &vm_destroyed]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot destroy VM because it's not down");
}
cloud_cs->destroyVM(vm_name);
vm_destroyed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_destroyed.push(std::pair(false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand All @@ -562,7 +562,6 @@ namespace wrench {
// Create the new service
auto new_service = SimpleStorageService::createSimpleStorageService(head_host, mount_points, {}, {});
return this->startService<wrench::StorageService>(new_service);

}

/**
Expand Down Expand Up @@ -616,13 +615,13 @@ namespace wrench {
BlockingQueue<std::tuple<bool, bool, std::string>> file_looked_up;

// Push the request into the blocking queue (will be a single one!)
this->things_to_do.push([file, ss, &file_looked_up](){
try {
bool result = ss->lookupFile(file);
file_looked_up.push(std::tuple(true, result, ""));
} catch (std::invalid_argument &e) {
file_looked_up.push(std::tuple(false, false, e.what()));
}
this->things_to_do.push([file, ss, &file_looked_up]() {
try {
bool result = ss->lookupFile(file);
file_looked_up.push(std::tuple(true, result, ""));
} catch (std::invalid_argument &e) {
file_looked_up.push(std::tuple(false, false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand Down Expand Up @@ -708,7 +707,7 @@ namespace wrench {
}

BlockingQueue<std::pair<bool, std::string>> job_submitted;
this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted](){
this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted]() {
try {
WRENCH_INFO("Submitting a job...");
this->job_manager->submitJob(job, cs, service_specific_args);
Expand Down Expand Up @@ -1036,14 +1035,14 @@ namespace wrench {
// Push the request into the blocking queue (will be a single one!)
//this->vm_to_suspend.push(std::pair(vm_name, cs));
BlockingQueue<std::pair<bool, std::string>> vm_suspended;
this->things_to_do.push([vm_name, cs, &vm_suspended](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->suspendVM(vm_name);
vm_suspended.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_suspended.push(std::pair(false, e.what()));
}
this->things_to_do.push([vm_name, cs, &vm_suspended]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->suspendVM(vm_name);
vm_suspended.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_suspended.push(std::pair(false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand Down Expand Up @@ -1097,14 +1096,14 @@ namespace wrench {
BlockingQueue<std::pair<bool, std::string>> vm_resumed;

// Push the request into the blocking queue (will be a single one!)
this->things_to_do.push([vm_name, cs, &vm_resumed](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->resumeVM(vm_name);
vm_resumed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_resumed.push(std::pair(false, e.what()));
}
this->things_to_do.push([vm_name, cs, &vm_resumed]() {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->resumeVM(vm_name);
vm_resumed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_resumed.push(std::pair(false, e.what()));
}
});

// Poll from the shared queue (will be a single one!)
Expand All @@ -1131,7 +1130,7 @@ namespace wrench {
}
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
std::vector<std::string> execution_hosts_list = cloud_cs->getHosts();
json answer {};
json answer{};
answer["execution_hosts"] = execution_hosts_list;
return answer;
}
Expand Down

0 comments on commit 2f5cb4f

Please sign in to comment.