Skip to content

Commit

Permalink
(#2) test passing++
Browse files Browse the repository at this point in the history
Refactoring stageFile()
  • Loading branch information
henricasanova committed Oct 10, 2019
1 parent 6d9c122 commit c4e4c64
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/wrench/services/compute/ComputeService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace wrench {
* @brief Terminate a previously-submitted job (which may or may not be running yet)
*
* @param job: the job to terminate
*
*
* @throw std::invalid_argument
* @throw WorkflowExecutionException
* @throw std::runtime_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ namespace wrench {

this->acquireDaemonLock();


// Kill all Workunit executors
for (auto const &wue : this->running_workunit_executors) {
wue->kill(job_termination);
Expand Down Expand Up @@ -564,6 +565,7 @@ namespace wrench {
workunit_executor->start(workunit_executor, true, false); // Daemonized, no auto-restart
// This is an error on the target host!!
} catch (std::shared_ptr<HostError> &e) {
this->releaseDaemonLock();
throw std::runtime_error(
"BareMetalComputeService::dispatchReadyWorkunits(): got a host error on the target host - this shouldn't happen");
}
Expand Down Expand Up @@ -673,6 +675,7 @@ namespace wrench {
std::shared_ptr<Workunit> workunit) {

// Don't kill me while I am doing this
WRENCH_INFO("HERE1: QCQUIRING");
this->acquireDaemonLock();

// Update core availabilities
Expand Down Expand Up @@ -784,6 +787,7 @@ namespace wrench {


// Don't kill me while I am doing this

this->acquireDaemonLock();

WRENCH_INFO("A workunit executor has failed to complete a workunit on behalf of job '%s'",
Expand Down Expand Up @@ -813,6 +817,7 @@ namespace wrench {
}
}
if (!found_it) {
this->releaseDaemonLock();
throw std::runtime_error(
"StandardJobExecutor::processWorkunitExecutorFailure(): couldn't find a recently failed workunit in the running workunit list");
}
Expand All @@ -821,6 +826,7 @@ namespace wrench {
// Deal with running work units!
for (auto const &wu : this->running_workunits) {
if ((not wu->post_file_copies.empty()) || (not wu->pre_file_copies.empty())) {
this->releaseDaemonLock();
throw std::runtime_error(
"StandardJobExecutor::processWorkunitExecutorFailure(): trying to cancel a running workunit that's doing some file copy operations - not supported (for now)");
}
Expand All @@ -833,6 +839,8 @@ namespace wrench {
}
}

this->releaseDaemonLock();

// Send the notification back
try {
S4U_Mailbox::putMessage(this->callback_mailbox,
Expand All @@ -845,7 +853,6 @@ namespace wrench {
// do nothing
}

this->releaseDaemonLock();
}


Expand Down
11 changes: 4 additions & 7 deletions src/wrench/services/storage/StorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,11 @@ namespace wrench {

auto fs = this->file_systems[mountpoint].get();

if (not fs->hasEnoughFreeSpace(file->getSize())) {
throw std::runtime_error("StorageService::stageFile(): Not enough storage space at location to stage file");
}

if (not fs->doesDirectoryExist(directory)) {
fs->createDirectory(directory);
try {
fs->stageFile(file, directory);
} catch (std::exception &e) {
throw;
}
fs->storeFileInDirectory(file, directory);
}

/**
Expand Down
9 changes: 8 additions & 1 deletion src/wrench/services/storage/simple/SimpleStorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,18 @@ namespace wrench {

TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_CYAN);

// "Start" all logical file systems
for (auto const &fs : this->file_systems) {
fs.second->init();
}


std::string message = "Simple Storage service %s starting on host %s:";
for (auto const &fs : this->file_systems) {
message += "\n - " + fs.first + ":" + std::to_string(fs.second->getTotalCapacity()/(1000*1000*1000)) + "GB";
message += "\n\t- " + fs.first + ": " + std::to_string(fs.second->getFreeSpace()/(1000*1000*1000)) + "/" +
std::to_string(fs.second->getTotalCapacity()/(1000*1000*1000)) + "GB";
}
WRENCH_INFO("%s", message.c_str());


/** Main loop **/
Expand Down
74 changes: 64 additions & 10 deletions src/wrench/services/storage/storage_helpers/LogicalFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace wrench {

mount_point = FileLocation::sanitizePath("/" + mount_point + "/");

// Check uniqueness
if (LogicalFileSystem::mount_points.find(hostname+":"+mount_point) != LogicalFileSystem::mount_points.end()) {
throw std::invalid_argument("LogicalFileSystem::LogicalFileSystem(): A FileSystem with mount point " +
mount_point + " at host " + hostname + " already exists");
// Check validity
if (not S4U_Simulation::hostHasMountPoint(hostname, mount_point)) {
throw std::invalid_argument("LogicalFileSystem::LogicalFileSystem(): Host " +
hostname + " does not have a disk mounted at " + mount_point);
}

if (mount_point != "/") { // "/" is obviously a prefix, but it's ok
Expand All @@ -40,24 +40,38 @@ namespace wrench {
}
// WRENCH_INFO("COMPARING %s TO %s", (hostname + ":" + mount_point).c_str(), mp.c_str());
if ((mp.find(hostname + ":" + mount_point) == 0) or ((hostname + ":" + mount_point).find(mp) == 0)) {
throw std::invalid_argument("LogicalFileSystem::LogicalFileSystem(): An existing mount point that "
"has as prefix or is a prefix of '" + mount_point +
"' already exists at host " + hostname);
throw std::invalid_argument(
"LogicalFileSystem::LogicalFileSystem(): An existing mount point that has as prefix or is a prefix of '" +
mount_point + "' already exists at host " + hostname);
}
}
}


LogicalFileSystem::mount_points.insert(hostname+":"+mount_point);

this->hostname = hostname;
this->mount_point = mount_point;
this->content["/"] = {};
this->total_capacity = S4U_Simulation::getDiskCapacity(hostname, mount_point);
this->occupied_space = 0;
this->initialized = false;
}



/**
* @brief Initializes the Logical File System (must be called before any other operation on this file system)
*/
void LogicalFileSystem::init() {
// Check uniqueness
if (LogicalFileSystem::mount_points.find(hostname+":"+mount_point) != LogicalFileSystem::mount_points.end()) {
throw std::invalid_argument("LogicalFileSystem::init(): A FileSystem with mount point " +
mount_point + " at host " + hostname + " already exists");
}
LogicalFileSystem::mount_points.insert(hostname+":"+mount_point);
this->initialized = true;
}

/**
* @brief Create a new directory
*
Expand All @@ -66,6 +80,7 @@ namespace wrench {
*/
void LogicalFileSystem::createDirectory(std::string absolute_path) {

assertInitHasBeenCalled();
assertDirectoryDoesNotExist(absolute_path);
this->content[absolute_path] = {};
}
Expand All @@ -76,6 +91,7 @@ namespace wrench {
* @return true if the directory exists
*/
bool LogicalFileSystem::doesDirectoryExist(std::string absolute_path) {
assertInitHasBeenCalled();
return (this->content.find(absolute_path) != this->content.end());
}

Expand All @@ -88,6 +104,7 @@ namespace wrench {
*/
bool LogicalFileSystem::isDirectoryEmpty(std::string absolute_path) {

assertInitHasBeenCalled();
assertDirectoryExist(absolute_path);
return (this->content[absolute_path].empty());
}
Expand All @@ -99,6 +116,7 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::removeEmptyDirectory(std::string absolute_path) {
assertInitHasBeenCalled();
assertDirectoryExist(absolute_path);
assertDirectoryIsEmpty(absolute_path);
this->content.erase(absolute_path);
Expand All @@ -111,12 +129,13 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::storeFileInDirectory(WorkflowFile *file, std::string absolute_path) {
assertInitHasBeenCalled();
// If directory does not exit, create it
if (not doesDirectoryExist(absolute_path)) {
createDirectory(absolute_path);
}

this->content[absolute_path].insert(file);
this->content[absolute_path].insert(file);
std::string key = FileLocation::sanitizePath(absolute_path) + file->getID();
if (this->reserved_space.find(key) == this->reserved_space.end()) {
this->occupied_space += file->getSize();
Expand All @@ -133,6 +152,7 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::removeFileFromDirectory(WorkflowFile *file, std::string absolute_path) {
assertInitHasBeenCalled();
assertDirectoryExist(absolute_path);
assertFileIsInDirectory(file, absolute_path);
this->content[absolute_path].erase(file);
Expand All @@ -146,6 +166,7 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::removeAllFilesInDirectory(std::string absolute_path) {
assertInitHasBeenCalled();
assertDirectoryExist(absolute_path);
this->content[absolute_path].clear();
}
Expand All @@ -160,10 +181,12 @@ namespace wrench {
* @throw std::invalid_argument
*/
bool LogicalFileSystem::isFileInDirectory(WorkflowFile *file, std::string absolute_path) {
assertInitHasBeenCalled();
// If directory does not exist, say "no"
if (not doesDirectoryExist(absolute_path)) {
return false;
}

return (this->content[absolute_path].find(file) != this->content[absolute_path].end());
}

Expand All @@ -176,6 +199,7 @@ namespace wrench {
* @throw std::invalid_argument
*/
std::set<WorkflowFile *> LogicalFileSystem::listFilesInDirectory(std::string absolute_path) {
assertInitHasBeenCalled();
assertDirectoryExist(absolute_path);
return this->content[absolute_path];
}
Expand All @@ -185,6 +209,7 @@ namespace wrench {
* @return the total capacity
*/
double LogicalFileSystem::getTotalCapacity() {
assertInitHasBeenCalled();
return this->total_capacity;
}

Expand All @@ -194,6 +219,7 @@ namespace wrench {
* @return true if the number of bytes can fit
*/
bool LogicalFileSystem::hasEnoughFreeSpace(double bytes) {
assertInitHasBeenCalled();
return (this->total_capacity - this->occupied_space) >= bytes;
}

Expand All @@ -202,6 +228,7 @@ namespace wrench {
* @return the free space in bytes
*/
double LogicalFileSystem::getFreeSpace() {
assertInitHasBeenCalled();
return (this->total_capacity - this->occupied_space);
}

Expand All @@ -212,14 +239,15 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::reserveSpace(WorkflowFile *file, std::string absolute_path) {
assertInitHasBeenCalled();
std::string key = FileLocation::sanitizePath(absolute_path) + file->getID();

if (this->total_capacity - this->occupied_space < file->getSize()) {
throw std::invalid_argument("LogicalFileSystem::reserveSpace(): Not enough free space");
}
if (this->reserved_space.find(key) != this->reserved_space.end()) {
WRENCH_WARN("LogicalFileSystem::reserveSpace(): Space was already being reserved for storing file %s at path %s:%s. This is likely a redundant copy",
file->getID().c_str(), this->hostname.c_str(), absolute_path.c_str());
file->getID().c_str(), this->hostname.c_str(), absolute_path.c_str());
} else {
this->reserved_space[key] = file->getSize();
this->occupied_space += file->getSize();
Expand All @@ -233,6 +261,7 @@ namespace wrench {
* @throw std::invalid_argument
*/
void LogicalFileSystem::unreserveSpace(WorkflowFile *file, std::string absolute_path) {
assertInitHasBeenCalled();
std::string key = FileLocation::sanitizePath(absolute_path) + file->getID();

if (this->reserved_space.find(key) == this->reserved_space.end()) {
Expand All @@ -250,4 +279,29 @@ namespace wrench {
this->occupied_space -= file->getSize();
}


/**
* @brief Stage file in directory
* @param absolute_path: the directory's absolute path (at the mount point)
*
* @throw std::invalid_argument
*/
void LogicalFileSystem::stageFile(WorkflowFile *file, std::string absolute_path) {

// If Space is not sufficient, forget it
if (this->occupied_space + file->getSize() > this->total_capacity) {
throw std::invalid_argument("LogicalFileSystem::stageFile(): Insufficient space to store file " +
file->getID() + " at " + this->hostname + ":" + absolute_path);
}

absolute_path = wrench::FileLocation::sanitizePath(absolute_path);

// If directory does not exit, create it
if (this->content.find(absolute_path) == this->content.end()) {
this->content[absolute_path] = {};
}

this->content[absolute_path].insert(file);
this->occupied_space += file->getSize();
}
}
13 changes: 13 additions & 0 deletions src/wrench/services/storage/storage_helpers/LogicalFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ namespace wrench {

explicit LogicalFileSystem(std::string hostname, std::string mount_point);

void init();

double getTotalCapacity();
bool hasEnoughFreeSpace(double bytes);
double getFreeSpace();
Expand All @@ -49,6 +51,10 @@ namespace wrench {
std::map<std::string, std::set<WorkflowFile*>> content;
private:

friend class StorageService;

void stageFile(WorkflowFile *file, std::string absolute_path);

static std::set<std::string> mount_points;


Expand All @@ -58,6 +64,13 @@ namespace wrench {
double occupied_space;
std::map<std::string, double> reserved_space;

bool initialized;

void assertInitHasBeenCalled() {
if (not this->initialized) {
throw std::runtime_error("LogicalFileSystem::assertInitHasBeenCalled(): A logical file system needs to be initialized before it's been called");
}
}
void assertDirectoryExist(std::string absolute_path) {
if (not this->doesDirectoryExist(absolute_path)) {
throw std::invalid_argument("LogicalFileSystem::assertDirectoryExists(): directory " + absolute_path + " does not exist");
Expand Down
3 changes: 2 additions & 1 deletion src/wrench/simgrid_S4U_util/S4U_Simulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ namespace wrench {
if (!p) {
p = "/";
}
if (std::string(p) == mount_point) {

if (FileLocation::sanitizePath(std::string(p)) == FileLocation::sanitizePath(mount_point)) {
return true;
}
}
Expand Down
2 changes: 2 additions & 0 deletions test/compute_services/StandardJobExecutorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,8 @@ class JobTerminationTestDuringAComputationWMS : public wrench::WMS {
auto job_manager = this->createJobManager();


wrench::StorageService::deleteFile(this->getWorkflow()->getFileByID("input_file"),
wrench::FileLocation::LOCATION(this->test->storage_service1));
/** Create a 4-task job and kill it **/
{
wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1.1", 3600, 6, 6, 1.0, 0);
Expand Down
Loading

0 comments on commit c4e4c64

Please sign in to comment.