Skip to content

Commit

Permalink
(#157) Added a "complex job" example
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Apr 20, 2020
1 parent e659f3c commit 97d7e56
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 5 deletions.
1 change: 1 addition & 0 deletions conf/cmake/Examples.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set(EXAMPLES_CMAKEFILES_TXT
examples/basic-examples/bare-metal-chain/CMakeLists.txt
examples/basic-examples/bare-metal-chain-scratch/CMakeLists.txt
examples/basic-examples/bare-metal-bag-of-tasks/CMakeLists.txt
examples/basic-examples/bare-metal-complex-job/CMakeLists.txt
)

foreach (cmakefile ${EXAMPLES_CMAKEFILES_TXT})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
** the completion time of each workflow task is printed.
**
** Example invocation of the simulator for a 10-task workflow, with only WMS logging:
** ./bare-metal-bag-of-tasks-simulator 10 ./two_hosts.xml --wrench-no-logs --log=one_task_at_a_time_wms.threshold=info
** ./bare-metal-bag-of-tasks-simulator 10 ./two_hosts.xml --wrench-no-logs --log=two_tasks_at_a_time_wms.threshold=info
**
** Example invocation of the simulator for a 5-task workflow with full logging:
** ./bare-metal-bag-of-tasks-simulator 5 ./two_hosts.xml
Expand Down Expand Up @@ -108,7 +108,7 @@ int main(int argc, char **argv) {
* to a software infrastructure that can execute tasks on hardware resources.
* This particular service is started on ComputeHost and has no scratch storage space (mount point argument = "").
* This means that tasks running on this service will access data only from remote storage services. */
std::cerr << "Instantiating a BareMetalComputeService on WMSHost..." << std::endl;
std::cerr << "Instantiating a BareMetalComputeService on ComputeHost..." << std::endl;
auto baremetal_service = simulation.add(new wrench::BareMetalComputeService(
"ComputeHost", {"ComputeHost"}, "", {}, {}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace wrench {
storage_services,
{}, nullptr,
hostname,
"one-task-at-a-time") {}
"two-tasks-at-a-time") {}

/**
* @brief main method of the TwoTasksAtATimeWMS daemon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ int main(int argc, char **argv) {
* This particular service is started on ComputeHost and has scratch storage space (mount point argument = "/scratch").
* This means that the WMS can opt to leave files in scratch. However, files in scratch are removed after
* a job completes */
std::cerr << "Instantiating a BareMetalComputeService on WMSHost..." << std::endl;
std::cerr << "Instantiating a BareMetalComputeService on ComputeHost..." << std::endl;
auto baremetal_service = simulation.add(new wrench::BareMetalComputeService(
"ComputeHost", {"ComputeHost"}, "/scratch/", {}, {}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int main(int argc, char **argv) {
* to a software infrastructure that can execute tasks on hardware resources.
* This particular service is started on ComputeHost and has no scratch storage space (mount point argument = "").
* This means that tasks running on this service will access data only from remote storage services. */
std::cerr << "Instantiating a BareMetalComputeService on WMSHost..." << std::endl;
std::cerr << "Instantiating a BareMetalComputeService on ComputeHost..." << std::endl;
auto baremetal_service = simulation.add(new wrench::BareMetalComputeService(
"ComputeHost", {"ComputeHost"}, "", {}, {}));

Expand Down
12 changes: 12 additions & 0 deletions examples/basic-examples/bare-metal-complex-job/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
set(SOURCE_FILES
ComplexJobWMS.h
ComplexJobWMS.cpp
ComplexJob.cpp
)

add_executable(wrench-bare-metal-complex-job-simulator ${SOURCE_FILES})

target_link_libraries(wrench-bare-metal-complex-job-simulator wrench ${SimGrid_LIBRARY} ${PUGIXML_LIBRARY} ${LEMON_LIBRARY})

install(TARGETS wrench-bare-metal-complex-job-simulator DESTINATION bin)

147 changes: 147 additions & 0 deletions examples/basic-examples/bare-metal-complex-job/ComplexJob.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

/**
** This simulator simulates the execution of a one-task workflow, where the task is:
**
** {InFile #0, InFile #1} -> Task -> {OutFile #0, OutFile #1}
**
** The compute platform comprises four hosts, WMSHost, StorageHost1, StorageHost2, and ComputeHost.
** On WMSHost runs a WMS (defined in class ComplexJobWMS). On ComputeHost runs a bare metal
** compute service, that has access to the 10 cores of that host. On StorageHost1 and
** StorageHost2 run two storage services. Once the simulation is done,
** the completion time of each workflow task is printed.
**
** Example invocation of the simulator with only WMS logging:
** ./bare-metal-complex-job-simulator ./four_hosts.xml --wrench-no-logs --log=complex_job_wms.threshold=info
**
** Example invocation of the simulator for a 5-task workflow with full logging:
** ./bare-metal-complex-job-simulator ./four_hosts.xml
**/


#include <iostream>
#include <wrench.h>

#include "ComplexJobWMS.h" // WMS implementation

/**
* @brief The Simulator's main function
*
* @param argc: argument count
* @param argv: argument array
* @return 0 on success, non-zero otherwise
*/
int main(int argc, char **argv) {

/*
* Declare a WRENCH simulation object
*/
wrench::Simulation simulation;

/* Initialize the simulation, which may entail extracting WRENCH-specific and
* Simgrid-specific command-line arguments that can modify general simulation behavior.
* Two special command-line arguments are --help-wrench and --help-simgrid, which print
* details about available command-line arguments. */
simulation.init(&argc, argv);

/* Parsing of the command-line arguments for this WRENCH simulation */
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <xml platform file> [optional logging arguments]" << std::endl;
exit(1);
}

/* Reading and parsing the platform description file, written in XML following the SimGrid-defined DTD,
* to instantiate the simulated platform */
std::cerr << "Instantiating simulated platform..." << std::endl;
simulation.instantiatePlatform(argv[1]);

/* Declare a workflow */
wrench::Workflow workflow;

/* Add the workflow task an files */
auto task = workflow.addTask("task", 10000000000.0, 1, 10, 0.90, 10000000);
task->addInputFile(workflow.addFile("infile_1", 100000000));
task->addInputFile(workflow.addFile("infile_2", 10000000));
task->addOutputFile(workflow.addFile("outfile_1", 200000000));
task->addOutputFile(workflow.addFile("outfile_2", 5000000));

/* Instantiate a storage service, and add it to the simulation.
* A wrench::StorageService is an abstraction of a service on
* which files can be written and read. This particular storage service, which is an instance
* of wrench::SimpleStorageService, is started on StorageHost1 in the
* platform , which has an attached disk mounted at "/". The SimpleStorageService
* is a basic storage service implementation provided by WRENCH.
* Throughout the simulation execution, input/output files of workflow tasks will be located
* in this storage service, and accessed remotely by the compute service. Note that the
* storage service is configured to use a buffer size of 50M when transferring data over
* the network (i.e., to pipeline disk reads/writes and network revs/sends). */
std::cerr << "Instantiating a SimpleStorageService on StorageHost1..." << std::endl;
auto storage_service1 = simulation.add(new wrench::SimpleStorageService(
"StorageHost1", {"/"}, {{wrench::SimpleStorageServiceProperty::BUFFER_SIZE, "50000000"}}, {}));

/* Instantiate another one on StorageHost2 */
std::cerr << "Instantiating a SimpleStorageService on StorageHost2..." << std::endl;
auto storage_service2 = simulation.add(new wrench::SimpleStorageService(
"StorageHost2", {"/"}, {{wrench::SimpleStorageServiceProperty::BUFFER_SIZE, "50000000"}}, {}));

/* Instantiate a bare-metal compute service, and add it to the simulation.
* A wrench::BareMetalComputeService is an abstraction of a compute service that corresponds to a
* to a software infrastructure that can execute tasks on hardware resources.
* This particular service is started on ComputeHost and has no scratch storage space (mount point argument = "").
* This means that tasks running on this service will access data only from remote storage services. */
std::cerr << "Instantiating a BareMetalComputeService on ComputeHost..." << std::endl;
auto baremetal_service = simulation.add(new wrench::BareMetalComputeService(
"ComputeHost", {"ComputeHost"}, "", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in OneTaskAtATimeWMS.cpp
* for more details */

auto wms = simulation.add(
new wrench::ComplexJobWMS({baremetal_service}, {storage_service1, storage_service2}, "WMSHost"));

/* Associate the workflow to the WMS */
wms->addWorkflow(&workflow);

/* Instantiate a file registry service to be started on WMSHost. This service is
* essentially a replica catalog that stores <file , storage service> pairs so that
* any service, in particular a WMS, can discover where workflow files are stored. */
std::cerr << "Instantiating a FileRegistryService on WMSHost ..." << std::endl;
auto file_registry_service = new wrench::FileRegistryService("WMSHost");
simulation.add(file_registry_service);

/* It is necessary to store, or "stage", input files that only input. The getInputFiles()
* method of the Workflow class returns the set of all workflow files that are not generated
* by workflow tasks, and thus are only input files. These files are then staged on the storage service. */
std::cerr << "Staging task input files..." << std::endl;
for (auto const &f : workflow.getInputFiles()) {
simulation.stageFile(f, storage_service1);
}

/* Launch the simulation. This call only returns when the simulation is complete. */
std::cerr << "Launching the Simulation..." << std::endl;
try {
simulation.launch();
} catch (std::runtime_error &e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
std::cerr << "Simulation done!" << std::endl;

/* Simulation results can be examined via simulation.output, which provides access to traces
* of events. In the code below, we print the retrieve the trace of all task completion events, print how
* many such events there are, and print some information for the first such event. */
auto trace = simulation.getOutput().getTrace<wrench::SimulationTimestampTaskCompletion>();
for (auto const &item : trace) {
std::cerr << "Task " << item->getContent()->getTask()->getID() << " completed at time " << item->getDate() << std::endl;
}

return 0;
}
154 changes: 154 additions & 0 deletions examples/basic-examples/bare-metal-complex-job/ComplexJobWMS.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

/**
** A Workflow Management System (WMS) implementation that operates on a workflow
** with a single task that has two input files and two output files as follows:
** - Run the workflow has a single job that:
** - Copies the first input file from the first storage service to the second one
** - Runs the task so that it produces its output files on the second storage service
** - Copies the task's first output file to the first storage service
** - Deletes the task's second output file on the second storage service
**/

#include <iostream>

#include "ComplexJobWMS.h"

XBT_LOG_NEW_DEFAULT_CATEGORY(complex_job_wms, "Log category for ComplexJobWMS");

namespace wrench {

/**
* @brief Constructor, which calls the super constructor
*
* @param compute_services: a set of compute services available to run tasks
* @param storage_services: a set of storage services available to store files
* @param hostname: the name of the host on which to start the WMS
*/
ComplexJobWMS::ComplexJobWMS(const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::set<std::shared_ptr<StorageService>> &storage_services,
const std::string &hostname) : WMS(
nullptr, nullptr,
compute_services,
storage_services,
{}, nullptr,
hostname,
"complex-job") {}

/**
* @brief main method of the ComplexJobWMS daemon
*
* @return 0 on completion
*
* @throw std::runtime_error
*/
int ComplexJobWMS::main() {

/* Set the logging output to GREEN */
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);

WRENCH_INFO("WMS starting on host %s", Simulation::getHostName().c_str());
WRENCH_INFO("About to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks());

/* Create a job manager so that we can create/submit jobs */
auto job_manager = this->createJobManager();

/* Get the first available bare-metal compute service */
auto compute_service = *(this->getAvailableComputeServices<BareMetalComputeService>().begin());

/* Get the first and second available storage services */
auto storage_services = this->getAvailableStorageServices();
std::shared_ptr<StorageService> storage_service1, storage_service2;
if ((*(this->getAvailableStorageServices().begin()))->getHostname() == "StorageHost1") {
storage_service1 = *(this->getAvailableStorageServices().begin());
storage_service2 = *(this->getAvailableStorageServices().begin()++);
} else {
storage_service2 = *(this->getAvailableStorageServices().begin());
storage_service1 = *(this->getAvailableStorageServices().begin()++);
}

/* Get references to the task and files */
auto task = this->getWorkflow()->getTaskByID("task");
auto infile_1 = this->getWorkflow()->getFileByID("infile_1");
auto infile_2 = this->getWorkflow()->getFileByID("infile_2");
auto outfile_1 = this->getWorkflow()->getFileByID("outfile_1");
auto outfile_2 = this->getWorkflow()->getFileByID("outfile_2");

/* Now let's create a map of file locations, stating for each file
* where is should be read/written while the task executes */
std::map<WorkflowFile *, std::shared_ptr<FileLocation>> file_locations;

file_locations[infile_1] = FileLocation::LOCATION(storage_service2);
file_locations[infile_2] = FileLocation::LOCATION(storage_service1);
file_locations[outfile_1] = FileLocation::LOCATION(storage_service2);
file_locations[outfile_2] = FileLocation::LOCATION(storage_service2);

/* Let's create a set of "pre" file copy operations to be performed
* BEFORE the task can run */
std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> , std::shared_ptr<FileLocation> >> pre_file_copies;
pre_file_copies.emplace_back(infile_1, FileLocation::LOCATION(storage_service1), FileLocation::LOCATION(storage_service2));

/* Let's create a set of "post" file copy operations to be performed
* AFTER the task can run */
std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> , std::shared_ptr<FileLocation> >> post_file_copies;
pre_file_copies.emplace_back(outfile_1, FileLocation::LOCATION(storage_service2), FileLocation::LOCATION(storage_service1));

/* Let's create a set of file deletion operations to be performed
* AFTER the "post" file copies have been performed */
std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> >> cleanup_file_deletions;
cleanup_file_deletions.push_back(std::make_tuple(outfile_2, FileLocation::LOCATION(storage_service2)));

/* Create the standard job */
auto job = job_manager->createStandardJob({task}, file_locations, pre_file_copies, post_file_copies, cleanup_file_deletions);

/* Submit the job to the compute service */
job_manager->submitJob(job, compute_service);

/* Wait for a workflow execution event and process it. In this case we know that
* the event will be a StandardJobCompletionEvent, which is processed by the method
* processEventStandardJobCompletion() that this class overrides. */
this->waitForAndProcessNextEvent();

WRENCH_INFO("Workflow execution complete");
return 0;
}

/**
* @brief Process a standard job completion event
*
* @param event: the event
*/
void ComplexJobWMS::processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str());
}

/**
* @brief Process a standard job failure event
*
* @param event: the event
*/
void ComplexJobWMS::processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
/* Print some error message */
WRENCH_INFO("Notified that a standard job has failed for task %s with error %s",
task->getID().c_str(),
event->failure_cause->toString().c_str());
throw std::runtime_error("ABORTING DUE TO JOB FAILURE");
}


}
Loading

0 comments on commit 97d7e56

Please sign in to comment.