Skip to content

Commit

Permalink
(#157)
Browse files Browse the repository at this point in the history
Added a virtual cluster example
  • Loading branch information
henricasanova committed Apr 21, 2020
1 parent 1e8c86d commit 24f72b6
Show file tree
Hide file tree
Showing 14 changed files with 459 additions and 17 deletions.
1 change: 1 addition & 0 deletions conf/cmake/Examples.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(EXAMPLES_CMAKEFILES_TXT
examples/basic-examples/bare-metal-bag-of-tasks/CMakeLists.txt
examples/basic-examples/bare-metal-complex-job/CMakeLists.txt
examples/basic-examples/cloud-bag-of-tasks/CMakeLists.txt
examples/basic-examples/virtualized-cluster-bag-of-tasks/CMakeLists.txt
examples/basic-examples/batch-bag-of-tasks/CMakeLists.txt
examples/basic-examples/batch-pilot-job/CMakeLists.txt
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ int main(int argc, char **argv) {
"ComputeHost", {"ComputeHost"}, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in TwoTasksAtATimeWMS.cpp
* for more details */
* for executing the workflow. */

auto wms = simulation.add(
new wrench::TwoTasksAtATimeWMS({baremetal_service}, {storage_service}, "WMSHost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ int main(int argc, char **argv) {
"ComputeHost", {"ComputeHost"}, "/scratch/", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in OneTaskAtATimeWMS.cpp
* for more details */
* for executing the workflow */
auto wms = simulation.add(
new wrench::WorkflowAsAsingleJobWMS({baremetal_service}, {storage_service}, "WMSHost"));

Expand Down
3 changes: 1 addition & 2 deletions examples/basic-examples/bare-metal-chain/BareMetalChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ int main(int argc, char **argv) {
"ComputeHost", {"ComputeHost"}, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in OneTaskAtATimeWMS.cpp
* for more details */
* for executing the workflow. */

auto wms = simulation.add(
new wrench::OneTaskAtATimeWMS({baremetal_service}, {storage_service}, "WMSHost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ int main(int argc, char **argv) {
"ComputeHost", {"ComputeHost"}, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in OneTaskAtATimeWMS.cpp
* for more details */
* for executing the workflow. */

auto wms = simulation.add(
new wrench::ComplexJobWMS({baremetal_service}, {storage_service1, storage_service2}, "WMSHost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ int main(int argc, char **argv) {
"BatchHeadNode", batch_nodes, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in TwoTasksAtATimeWMS.cpp
* for more details */
* for executing the workflow. */
auto wms = simulation.add(
new wrench::TwoTasksAtATimeBatchWMS({batch_service}, {storage_service}, "WMSHost"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace wrench {
}

/* Initialize and seed a RNG */
std::uniform_int_distribution<int> dist(0, 1000000000);
std::uniform_int_distribution<long> dist(0, 1000000000);
std::mt19937 rng(42);

/* While the workflow isn't done, repeat the main loop */
Expand Down
3 changes: 1 addition & 2 deletions examples/basic-examples/batch-pilot-job/BatchPilotJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ int main(int argc, char **argv) {
"BatchHeadNode", batch_nodes, "/scratch/", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in PilotJobWMS.cpp
* for more details */
* for executing the workflow. */
auto wms = simulation.add(
new wrench::PilotJobWMS({batch_service}, {storage_service}, "WMSHost"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int main(int argc, char **argv) {

/* Parsing of the command-line arguments for this WRENCH simulation */
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <and EVEN number of tasks> <xml platform file> [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl;
std::cerr << "Usage: " << argv[0] << " <an EVEN number of tasks> <xml platform file> [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl;
exit(1);
}

Expand All @@ -80,7 +80,7 @@ int main(int argc, char **argv) {
wrench::Workflow workflow;

/* Initialize and seed a RNG */
std::uniform_int_distribution<double> dist(100000000,10000000000);
std::uniform_int_distribution<long> dist(100000000,10000000000);
std::mt19937 rng(42);

/* Add workflow tasks and files */
Expand Down Expand Up @@ -117,8 +117,7 @@ int main(int argc, char **argv) {
"CloudProviderHost", cloud_hosts, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in TwoTasksAtATimeWMS.cpp
* for more details */
* for executing the workflow. */

auto wms = simulation.add(
new wrench::TwoTasksAtATimeCloudWMS({cloud_service}, {storage_service}, "WMSHost"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

set(SOURCE_FILES
TwoTasksAtATimeVirtualizedClusterWMS.h
TwoTasksAtATimeVirtualizedClusterWMS.cpp
VirtualizedClusterBagOfTasks.cpp
)

add_executable(wrench-virtualized-cluster-bag-of-tasks-simulator ${SOURCE_FILES})

target_link_libraries(wrench-virtualized-cluster-bag-of-tasks-simulator wrench ${SimGrid_LIBRARY} ${PUGIXML_LIBRARY} ${LEMON_LIBRARY})

install(TARGETS wrench-virtualized-cluster-bag-of-tasks-simulator DESTINATION bin)

Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

/**
** A Workflow Management System (WMS) implementation that operates as follows:
** - Create two VMs, each one running on its own host
** - While the workflow is not done, repeat:
** - Pick up to two ready tasks
** - Submit both of them as two different jobs to the VMs
** - The most expensive task on the more powerful VM
** - The least expensive task on the less powerful VM
** - Each task reads the input file from the StorageService
** - Each task reads the output file from the StorageService
** - After a while, migrate one VM
**/

#include <iostream>

#include "TwoTasksAtATimeVirtualizedClusterWMS.h"

#define GB 1000000000

XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for TwoTasksAtATimeVirtualizedClusterWMS");

namespace wrench {

/**
* @brief Constructor, which calls the super constructor
*
* @param compute_services: a set of compute services available to run tasks
* @param storage_services: a set of storage services available to store files
* @param hostname: the name of the host on which to start the WMS
*/
TwoTasksAtATimeVirtualizedClusterWMS::TwoTasksAtATimeVirtualizedClusterWMS(const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::set<std::shared_ptr<StorageService>> &storage_services,
const std::string &hostname) : WMS(
nullptr, nullptr,
compute_services,
storage_services,
{}, nullptr,
hostname,
"two-tasks-at-a-time-virtualized-cluster") {}

/**
* @brief main method of the TwoTasksAtATimeVirtualizedClusterWMS daemon
*
* @return 0 on completion
*
* @throw std::runtime_error
*/
int TwoTasksAtATimeVirtualizedClusterWMS::main() {

/* Set the logging output to GREEN */
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);

WRENCH_INFO("WMS starting on host %s", Simulation::getHostName().c_str());
WRENCH_INFO("About to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks());

/* Create a job manager so that we can create/submit jobs */
auto job_manager = this->createJobManager();

/* Get the first available bare-metal compute service and storage service */
auto virtualized_cluster_service = *(this->getAvailableComputeServices<VirtualizedClusterComputeService>().begin());
auto storage_service = *(this->getAvailableStorageServices().begin());

/* Create a VM instance with 5 cores and one with 2 cores (and 500M of RAM) */
WRENCH_INFO("Creating a 'large' VM with 5 cores and a 'small' VM with 2 cores, both of them with 5GB RAM");
auto large_vm = virtualized_cluster_service->createVM(5, 5 * GB);
auto small_vm = virtualized_cluster_service->createVM(2, 5 * GB);

/* Start the VMs */
WRENCH_INFO("Start the large VM on host VirtualizedClusterHost1");
auto large_vm_compute_service = virtualized_cluster_service->startVM(large_vm, "VirtualizedClusterHost1");
WRENCH_INFO("Start the small VM on host VirtualizedClusterHost2");
auto small_vm_compute_service = virtualized_cluster_service->startVM(small_vm, "VirtualizedClusterHost2");

/* While the workflow isn't done, repeat the main loop */
while (not this->getWorkflow()->isDone()) {

/* Get the ready tasks */
auto ready_tasks = this->getWorkflow()->getReadyTasks();

/* Sort them by flops */
std::sort(ready_tasks.begin(), ready_tasks.end(),
[](const WorkflowTask *t1, const WorkflowTask *t2) -> bool {

if (t1->getFlops() == t2->getFlops()) {
return ((uintptr_t) t1 > (uintptr_t) t2);
} else {
return (t1->getFlops() < t2->getFlops());
}
});

/* Pick the least and most expensive task */
auto cheap_ready_task = ready_tasks.at(0);
auto expensive_ready_task = ready_tasks.at(ready_tasks.size() - 1);

/* Submit the cheap task to the small VM */
/* First, we need to create a map of file locations, stating for each file
* where is should be read/written */
std::map<WorkflowFile *, std::shared_ptr<FileLocation>> file_locations1;
file_locations1[cheap_ready_task->getInputFiles().at(0)] = FileLocation::LOCATION(storage_service);
file_locations1[cheap_ready_task->getOutputFiles().at(0)] = FileLocation::LOCATION(storage_service);

/* Create the job */
WRENCH_INFO("Creating a job to run task %s (%.2lf)",
cheap_ready_task->getID().c_str(), cheap_ready_task->getFlops());

auto standard_job1 = job_manager->createStandardJob(cheap_ready_task, file_locations1);

/* Submit the job to the small VM */
WRENCH_INFO("Submit this job to the small VM");
job_manager->submitJob(standard_job1, small_vm_compute_service);

/* Submit the expensive task to the large VM */
/* First, we need to create a map of file locations, stating for each file
* where is should be read/written */
std::map<WorkflowFile *, std::shared_ptr<FileLocation>> file_locations2;
file_locations2[expensive_ready_task->getInputFiles().at(0)] = FileLocation::LOCATION(storage_service);
file_locations2[expensive_ready_task->getOutputFiles().at(0)] = FileLocation::LOCATION(storage_service);

/* Create the job */
WRENCH_INFO("Creating a job to run task %s (%.2lf)",
expensive_ready_task->getID().c_str(), expensive_ready_task->getFlops());

auto standard_job2 = job_manager->createStandardJob(expensive_ready_task, file_locations2);

/* Submit the job to the large VM */
WRENCH_INFO("Submit this job to the large VM");
job_manager->submitJob(standard_job2, large_vm_compute_service);

/* Sleeping for 10 seconds */
Simulation::sleep(10);

/* Migrate the VM */
WRENCH_INFO("Migrating the small VM from VirtualizedClusterHost1 to VirtualizedClusterHost2");
virtualized_cluster_service->migrateVM(small_vm, "VirtualizedClusterHost2");
WRENCH_INFO("VM Migrated!");

/* Wait for workflow execution event and process it. In this case we know that
* the event will be a StandardJobCompletionEvent, which is processed by the method
* processEventStandardJobCompletion() that this class overrides. */
WRENCH_INFO("Wait for next event");
this->waitForAndProcessNextEvent();

/* And again! */
WRENCH_INFO("Wait for next event again");
this->waitForAndProcessNextEvent();
}

WRENCH_INFO("Workflow execution complete");
return 0;
}

/**
* @brief Process a standard job completion event
*
* @param event: the event
*/
void TwoTasksAtATimeVirtualizedClusterWMS::processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str());
}

/**
* @brief Process a standard job failure event
*
* @param event: the event
*/
void TwoTasksAtATimeVirtualizedClusterWMS::processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
/* Print some error message */
WRENCH_INFO("Notified that a standard job has failed for task %s with error %s",
task->getID().c_str(),
event->failure_cause->toString().c_str());
throw std::runtime_error("ABORTING DUE TO JOB FAILURE");
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/


#ifndef WRENCH_TWO_TASKS_AT_A_TIME_VIRTUALIZED_CLUSTER_H
#define WRENCH_TWO_TASKS_AT_A_TIME_VIRTUALIZED_CLUSTER_H

#include <wrench-dev.h>


namespace wrench {

class Simulation;

/**
* @brief A Workflow Management System (WMS) implementation (inherits from WMS)
*/
class TwoTasksAtATimeVirtualizedClusterWMS : public WMS {

public:
// Constructor
TwoTasksAtATimeVirtualizedClusterWMS(
const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::set<std::shared_ptr<StorageService>> &storage_services,
const std::string &hostname);

protected:

// Overriden method
void processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent>) override;
void processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent>) override;

private:
// main() method of the WMS
int main() override;

};
}
#endif //WRENCH_TWO_TASKS_AT_A_TIME_VIRTUALIZED_CLUSTER_H
Loading

0 comments on commit 24f72b6

Please sign in to comment.