diff --git a/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp b/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp index 038d3abcde..2a0697cb31 100644 --- a/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp +++ b/src/wrench/services/compute/bare_metal/BareMetalComputeService.cpp @@ -1147,7 +1147,7 @@ namespace wrench { this->releaseDaemonLock(); - // If the job not done, just return + // If the job is not done, just return if (this->completed_workunits[job].size() != this->all_workunits[job].size()) { return; } diff --git a/src/wrench/services/compute/work_unit_executor/WorkunitExecutor.cpp b/src/wrench/services/compute/work_unit_executor/WorkunitExecutor.cpp index 5d3f801e2c..2a3a66f076 100644 --- a/src/wrench/services/compute/work_unit_executor/WorkunitExecutor.cpp +++ b/src/wrench/services/compute/work_unit_executor/WorkunitExecutor.cpp @@ -370,6 +370,7 @@ namespace wrench { files_to_write[f] = work->file_locations[f]; } else { files_to_write[f] = FileLocation::LOCATION(this->scratch_space, this->scratch_space->getMountPoint() + "/" + job->getName()); + this->files_stored_in_scratch.insert(f); } } StorageService::writeFiles(files_to_write); diff --git a/src/wrench/services/storage/StorageService.cpp b/src/wrench/services/storage/StorageService.cpp index ea0ae07ca0..714de5c705 100644 --- a/src/wrench/services/storage/StorageService.cpp +++ b/src/wrench/services/storage/StorageService.cpp @@ -662,7 +662,7 @@ namespace wrench { if (this->hasMultipleMountPoints()) { throw std::invalid_argument("StorageService::getAbsolutePath(): The storage service has more than one mount point"); } - return this->file_systems.begin()->first; + return wrench::FileLocation::sanitizePath(this->file_systems.begin()->first); } diff --git a/src/wrench/services/storage/simple/SimpleStorageService.cpp b/src/wrench/services/storage/simple/SimpleStorageService.cpp index 5f37cc8fc9..e280d96a4c 100644 --- a/src/wrench/services/storage/simple/SimpleStorageService.cpp +++ b/src/wrench/services/storage/simple/SimpleStorageService.cpp @@ -617,8 +617,11 @@ namespace wrench { if ((not fs->doesDirectoryExist(location->getAbsolutePathAtMountPoint())) or (not fs->isFileInDirectory(file, location->getAbsolutePathAtMountPoint()))) { - failure_cause = std::shared_ptr( - new FileNotFound(file, location)); + // If this is scratch, we don't care, perhaps it was taken care of elsewhere... + if (not this->isScratch()) { + failure_cause = std::shared_ptr( + new FileNotFound(file, location)); + } } else { fs->removeFileFromDirectory(file, location->getAbsolutePathAtMountPoint()); } diff --git a/src/wrench/services/storage/storage_helpers/LogicalFileSystem.cpp b/src/wrench/services/storage/storage_helpers/LogicalFileSystem.cpp index b112405edf..34b8e2bcbd 100644 --- a/src/wrench/services/storage/storage_helpers/LogicalFileSystem.cpp +++ b/src/wrench/services/storage/storage_helpers/LogicalFileSystem.cpp @@ -116,7 +116,7 @@ namespace wrench { createDirectory(absolute_path); } - this->content[absolute_path].insert(file); + this->content[absolute_path].insert(file); std::string key = FileLocation::sanitizePath(absolute_path) + file->getID(); if (this->reserved_space.find(key) == this->reserved_space.end()) { this->occupied_space += file->getSize(); diff --git a/test/compute_services/ScratchSpaceTest.cpp b/test/compute_services/ScratchSpaceTest.cpp index cafb21a676..9622f98920 100644 --- a/test/compute_services/ScratchSpaceTest.cpp +++ b/test/compute_services/ScratchSpaceTest.cpp @@ -32,6 +32,8 @@ class ScratchSpaceTest : public ::testing::Test { std::shared_ptr compute_service2 = nullptr; wrench::Simulation *simulation; + void do_BogusScratchSpace_test(); + void do_SimpleScratchSpace_test(); void do_ScratchSpaceFailure_test(); @@ -47,83 +49,51 @@ class ScratchSpaceTest : public ::testing::Test { protected: ScratchSpaceTest() { - // Create the simplest workflow - workflow = std::unique_ptr(new wrench::Workflow()); - - // Create a four-host 10-core platform file - std::string xml = "" - "" - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - " " - ""; - FILE *platform_file = fopen(platform_file_path.c_str(), "w"); - fprintf(platform_file, "%s", xml.c_str()); - fclose(platform_file); + // Create the simplest workflow + workflow = std::unique_ptr(new wrench::Workflow()); + + // Create a four-host 10-core platform file + std::string xml = "" + "" + " " + " "; + for (int i=1; i <= 4; i++) { + xml += " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " " + " "; + } + xml += " " + " " + " " + " " + " " + " " + " " + " " + ""; + FILE *platform_file = fopen(platform_file_path.c_str(), "w"); + fprintf(platform_file, "%s", xml.c_str()); + fclose(platform_file); } @@ -132,6 +102,47 @@ class ScratchSpaceTest : public ::testing::Test { }; +/**********************************************************************/ +/** BOGUS SCRATCH SPACE TEST **/ +/**********************************************************************/ + +TEST_F(ScratchSpaceTest, BogusScratchSpaceTest) { + DO_TEST_WITH_FORK(do_BogusScratchSpace_test); +} + + +void ScratchSpaceTest::do_BogusScratchSpace_test() { + + + // Create and initialize a simulation + auto simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); + + ASSERT_NO_THROW(simulation->init(&argc, argv)); + + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; + + // Create a Compute Service + ASSERT_THROW(compute_service = simulation->add( + new wrench::BareMetalComputeService(hostname, + {std::make_pair(hostname, + std::make_tuple(wrench::ComputeService::ALL_CORES, + wrench::ComputeService::ALL_RAM))}, + "/scratch_bogus", {})), + std::invalid_argument); + + delete simulation; + + free(argv[0]); + free(argv); +} + /**********************************************************************/ /** SIMPLE SCRATCH SPACE TEST **/ /**********************************************************************/ @@ -144,7 +155,7 @@ class SimpleScratchSpaceTestWMS : public wrench::WMS { std::string hostname) : wrench::WMS(nullptr, nullptr, compute_services, {}, {}, nullptr, hostname, "test") { - this->test = test; + this->test = test; } private: @@ -152,117 +163,117 @@ class SimpleScratchSpaceTestWMS : public wrench::WMS { ScratchSpaceTest *test; int main() { - // Create a job manager - auto job_manager = this->createJobManager(); - - { - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task = this->getWorkflow()->addTask("task", 60, 1, 1, 1.0, 0); - task->addInputFile(this->getWorkflow()->getFileByID("input_file")); - task->addOutputFile(this->getWorkflow()->getFileByID("output_file")); - - // Create a StandardJob with some pre-copies - wrench::StandardJob *job = job_manager->createStandardJob( - {task}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - // Submit the job for execution - job_manager->submitJob(job, this->test->compute_service); - - // Wait for a workflow execution event - std::shared_ptr event; - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } - if (std::dynamic_pointer_cast(event)) { - //sleep to make sure that the files are deleted - wrench::S4U_Simulation::sleep(100); - double free_space_size = this->test->compute_service->getFreeScratchSpaceSize(); - if (free_space_size < this->test->compute_service->getTotalScratchSpaceSize()) { - throw std::runtime_error( - "File was not deleted from scratch"); - } - } else { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } + // Create a job manager + auto job_manager = this->createJobManager(); + + { + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task = this->getWorkflow()->addTask("task", 60, 1, 1, 1.0, 0); + task->addInputFile(this->getWorkflow()->getFileByID("input_file")); + task->addOutputFile(this->getWorkflow()->getFileByID("output_file")); + + // Create a StandardJob with some pre-copies + wrench::StandardJob *job = job_manager->createStandardJob( + {task}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); + + // Submit the job for execution + job_manager->submitJob(job, this->test->compute_service); + + // Wait for a workflow execution event + std::shared_ptr event; + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); } + if (std::dynamic_pointer_cast(event)) { + //sleep to make sure that the files are deleted + wrench::S4U_Simulation::sleep(100); + double free_space_size = this->test->compute_service->getFreeScratchSpaceSize(); + if (free_space_size < this->test->compute_service->getTotalScratchSpaceSize()) { + throw std::runtime_error( + "File was not deleted from scratch"); + } + } else { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + } + } - return 0; + return 0; } }; TEST_F(ScratchSpaceTest, SimpleScratchSpaceTest) { - DO_TEST_WITH_FORK(do_SimpleScratchSpace_test); + DO_TEST_WITH_FORK(do_SimpleScratchSpace_test); } void ScratchSpaceTest::do_SimpleScratchSpace_test() { - // Create and initialize a simulation - auto simulation = new wrench::Simulation(); - int argc = 1; - auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("unit_test"); + // Create and initialize a simulation + auto simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); - ASSERT_NO_THROW(simulation->init(&argc, argv)); + ASSERT_NO_THROW(simulation->init(&argc, argv)); - // Setting up the platform - ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); - // Get a hostname - std::string hostname = simulation->getHostnameList()[0]; + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; - // Create a Storage Service - ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk1"}))); - // Create a Storage Service - ASSERT_NO_THROW(storage_service2 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service + ASSERT_NO_THROW(storage_service2 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk2"}))); - // Create a Compute Service - ASSERT_NO_THROW(compute_service = simulation->add( - new wrench::BareMetalComputeService(hostname, - {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, - wrench::ComputeService::ALL_RAM))}, - "/scratch", {}))); + // Create a Compute Service + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::BareMetalComputeService(hostname, + {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, + wrench::ComputeService::ALL_RAM))}, + "/scratch3000", {}))); - simulation->add(new wrench::FileRegistryService(hostname)); + simulation->add(new wrench::FileRegistryService(hostname)); - // Create a WMS - std::shared_ptr wms = nullptr;; - ASSERT_NO_THROW(wms = simulation->add( - new SimpleScratchSpaceTestWMS( - this, {compute_service}, hostname))); + // Create a WMS + std::shared_ptr wms = nullptr;; + ASSERT_NO_THROW(wms = simulation->add( + new SimpleScratchSpaceTestWMS( + this, {compute_service}, hostname))); - ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); + ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); - // Create two workflow files - wrench::WorkflowFile *input_file = this->workflow->addFile("input_file", 10000.0); - wrench::WorkflowFile *output_file = this->workflow->addFile("output_file", 20000.0); + // Create two workflow files + wrench::WorkflowFile *input_file = this->workflow->addFile("input_file", 1000.0); + wrench::WorkflowFile *output_file = this->workflow->addFile("output_file", 2000.0); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(input_file, storage_service1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(input_file, storage_service1)); - // Running a "run a single task" simulation - // Note that in these tests the WMS creates workflow tasks, which a user would - // of course not be likely to do - ASSERT_NO_THROW(simulation->launch()); + // Running a "run a single task" simulation + // Note that in these tests the WMS creates workflow tasks, which a user would + // of course not be likely to do + ASSERT_NO_THROW(simulation->launch()); - delete simulation; + delete simulation; - free(argv[0]); - free(argv); + free(argv[0]); + free(argv); } @@ -278,7 +289,7 @@ class SimpleScratchSpaceFailureTestWMS : public wrench::WMS { std::string hostname) : wrench::WMS(nullptr, nullptr, compute_services, {}, {}, nullptr, hostname, "test") { - this->test = test; + this->test = test; } private: @@ -286,207 +297,207 @@ class SimpleScratchSpaceFailureTestWMS : public wrench::WMS { ScratchSpaceTest *test; int main() { - // Create a job manager - auto job_manager = this->createJobManager(); - - { - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 60, 1, 1, 1.0, 0); - task1->addInputFile(this->getWorkflow()->getFileByID("input_file1")); - - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 60, 1, 1, 1.0, 0); - task2->addInputFile(this->getWorkflow()->getFileByID("input_file2")); - - // Create a StandardJob with SOME pre-copies from public storage to scratch - wrench::StandardJob *job1 = job_manager->createStandardJob( - {task1}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file1"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - - // Create a StandardJob with NO pre-copies from public storage to scratch - wrench::StandardJob *job2 = job_manager->createStandardJob( - {task2}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file2"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - // Submit the job for execution to the compute service with NO scratch, thus expecting an error - job_manager->submitJob(job1, this->test->compute_service); - - // Wait for a workflow execution event - std::shared_ptr event; - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } + // Create a job manager + auto job_manager = this->createJobManager(); - auto real_event = std::dynamic_pointer_cast(event); - if (real_event) { - auto cause = std::dynamic_pointer_cast(real_event->failure_cause); - if (not cause) { - throw std::runtime_error("Got a job failure event, but unexpected failure cause: " + - real_event->failure_cause->toString() + " (expected: NoScratchSpace)"); - } - cause->toString(); // for coverage - } else { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } + { + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 60, 1, 1, 1.0, 0); + task1->addInputFile(this->getWorkflow()->getFileByID("input_file1")); - // Submit the job for execution to the compute service which has some scratch, but not enough space, thus expecting an exception - job_manager->submitJob(job1, this->test->compute_service1); - // Wait for a workflow execution event - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 60, 1, 1, 1.0, 0); + task2->addInputFile(this->getWorkflow()->getFileByID("input_file2")); - auto real_event2 = std::dynamic_pointer_cast(event); - if (real_event2) { - auto cause = std::dynamic_pointer_cast(real_event2->failure_cause); - if (not cause) { - throw std::runtime_error("Got a job failure event, but unexpected failure cause: " + - real_event2->failure_cause->toString() + " (expected: StorageServiceNotEnoughSpace)"); - } - } else { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } + // Create a StandardJob with SOME pre-copies from public storage to scratch + wrench::StandardJob *job1 = job_manager->createStandardJob( + {task1}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file1"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); - // Submit two jobs for execution to the same compute service which has just enough scratch for only one job and so - // one should succeed and the second one should fail - job_manager->submitJob(job1, this->test->compute_service2); - wrench::S4U_Simulation::sleep(1); - job_manager->submitJob(job2, this->test->compute_service2); - // Wait for a workflow execution event - int num_events = 0; - int prev_event = -1; - int i = 0; - while (i < 2) { - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } - - if (std::dynamic_pointer_cast(event)) { - if (prev_event == -1) { - prev_event = 0; - num_events++; - } else if (prev_event == 1) { - num_events++; - } - } else if (std::dynamic_pointer_cast(event)) { - if (prev_event == -1) { - prev_event = 1; - num_events++; - } else if (prev_event == 0) { - num_events++; - } - } else { - throw std::runtime_error("Unexpected workflow execution event or here: " + event->toString()); - } - i++; - } - if (num_events != 2) { - throw std::runtime_error("Did not get enough execution events"); + // Create a StandardJob with NO pre-copies from public storage to scratch + wrench::StandardJob *job2 = job_manager->createStandardJob( + {task2}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file2"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); + + // Submit the job for execution to the compute service with NO scratch, thus expecting an error + job_manager->submitJob(job1, this->test->compute_service); + + // Wait for a workflow execution event + std::shared_ptr event; + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); + } + + auto real_event = std::dynamic_pointer_cast(event); + if (real_event) { + auto cause = std::dynamic_pointer_cast(real_event->failure_cause); + if (not cause) { + throw std::runtime_error("Got a job failure event, but unexpected failure cause: " + + real_event->failure_cause->toString() + " (expected: NoScratchSpace)"); + } + cause->toString(); // for coverage + } else { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + } + + // Submit the job for execution to the compute service which has some scratch, but not enough space, thus expecting an exception + job_manager->submitJob(job1, this->test->compute_service1); + // Wait for a workflow execution event + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); + } + + auto real_event2 = std::dynamic_pointer_cast(event); + if (real_event2) { + auto cause = std::dynamic_pointer_cast(real_event2->failure_cause); + if (not cause) { + throw std::runtime_error("Got a job failure event, but unexpected failure cause: " + + real_event2->failure_cause->toString() + " (expected: StorageServiceNotEnoughSpace)"); + } + } else { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + } + + // Submit two jobs for execution to the same compute service which has just enough scratch for only one job and so + // one should succeed and the second one should fail + job_manager->submitJob(job1, this->test->compute_service2); + wrench::S4U_Simulation::sleep(1); + job_manager->submitJob(job2, this->test->compute_service2); + // Wait for a workflow execution event + int num_events = 0; + int prev_event = -1; + int i = 0; + while (i < 2) { + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); + } + + if (std::dynamic_pointer_cast(event)) { + if (prev_event == -1) { + prev_event = 0; + num_events++; + } else if (prev_event == 1) { + num_events++; } + } else if (std::dynamic_pointer_cast(event)) { + if (prev_event == -1) { + prev_event = 1; + num_events++; + } else if (prev_event == 0) { + num_events++; + } + } else { + throw std::runtime_error("Unexpected workflow execution event or here: " + event->toString()); + } + i++; + } + + if (num_events != 2) { + throw std::runtime_error("Did not get enough execution events"); } + } - return 0; + return 0; } }; TEST_F(ScratchSpaceTest, SimpleScratchSpaceFailureTest) { - DO_TEST_WITH_FORK(do_ScratchSpaceFailure_test); + DO_TEST_WITH_FORK(do_ScratchSpaceFailure_test); } void ScratchSpaceTest::do_ScratchSpaceFailure_test() { - // Create and initialize a simulation - auto simulation = new wrench::Simulation(); - int argc = 1; - auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("unit_test"); + // Create and initialize a simulation + auto simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); - ASSERT_NO_THROW(simulation->init(&argc, argv)); + ASSERT_NO_THROW(simulation->init(&argc, argv)); - // Setting up the platform - ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); - // Get a hostname - std::string hostname = simulation->getHostnameList()[0]; + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; - // Create a Storage Service - ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service3 + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk1"}))); - // Create a Storage Service - ASSERT_NO_THROW(storage_service2 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service + ASSERT_NO_THROW(storage_service2 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk2"}))); - // Create a Compute Service that does not have scratch space - ASSERT_NO_THROW(compute_service = simulation->add( - new wrench::BareMetalComputeService(hostname, - {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, - wrench::ComputeService::ALL_RAM))}, - ""))); + // Create a Compute Service that does not have scratch space + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::BareMetalComputeService(hostname, + {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, + wrench::ComputeService::ALL_RAM))}, + ""))); - // Create a Compute Service that has smaller scratch space than the files to be stored - ASSERT_NO_THROW(compute_service1 = simulation->add( - new wrench::BareMetalComputeService(hostname, - {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, - wrench::ComputeService::ALL_RAM))}, - "/scratch100"))); + // Create a Compute Service that has smaller scratch space than the files to be stored + ASSERT_NO_THROW(compute_service1 = simulation->add( + new wrench::BareMetalComputeService(hostname, + {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, + wrench::ComputeService::ALL_RAM))}, + "/scratch100"))); - // Create a Compute Service that has enough scratch space to store the files - ASSERT_NO_THROW(compute_service2 = simulation->add( - new wrench::BareMetalComputeService(hostname, - {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, - wrench::ComputeService::ALL_RAM))}, - "/scratch10000"))); + // Create a Compute Service that has enough scratch space to store the files + ASSERT_NO_THROW(compute_service2 = simulation->add( + new wrench::BareMetalComputeService(hostname, + {std::make_pair(hostname, std::make_tuple(wrench::ComputeService::ALL_CORES, + wrench::ComputeService::ALL_RAM))}, + "/scratch10000"))); - simulation->add(new wrench::FileRegistryService(hostname)); + simulation->add(new wrench::FileRegistryService(hostname)); - // Create a WMS - std::shared_ptr wms = nullptr;; - ASSERT_NO_THROW(wms = simulation->add( - new SimpleScratchSpaceFailureTestWMS( - this, {compute_service}, hostname))); + // Create a WMS + std::shared_ptr wms = nullptr;; + ASSERT_NO_THROW(wms = simulation->add( + new SimpleScratchSpaceFailureTestWMS( + this, {compute_service}, hostname))); - ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); + ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); - // Create two workflow files - wrench::WorkflowFile *input_file1 = this->workflow->addFile("input_file1", 10000.0); - wrench::WorkflowFile *input_file2 = this->workflow->addFile("input_file2", 10000.0); + // Create two workflow files + wrench::WorkflowFile *input_file1 = this->workflow->addFile("input_file1", 10000.0); + wrench::WorkflowFile *input_file2 = this->workflow->addFile("input_file2", 10000.0); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(input_file1, storage_service1)); - ASSERT_NO_THROW(simulation->stageFile(input_file2, storage_service1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(input_file1, storage_service1)); + ASSERT_NO_THROW(simulation->stageFile(input_file2, storage_service1)); - // Running a "run a single task" simulation - // Note that in these tests the WMS creates workflow tasks, which a user would - // of course not be likely to do - ASSERT_NO_THROW(simulation->launch()); + // Running a "run a single task" simulation + // Note that in these tests the WMS creates workflow tasks, which a user would + // of course not be likely to do + ASSERT_NO_THROW(simulation->launch()); - delete simulation; + delete simulation; - free(argv[0]); - free(argv); + free(argv[0]); + free(argv); } @@ -502,7 +513,7 @@ class PilotJobScratchSpaceTestWMS : public wrench::WMS { std::string hostname) : wrench::WMS(nullptr, nullptr, compute_services, {}, {}, nullptr, hostname, "test") { - this->test = test; + this->test = test; } private: @@ -511,198 +522,198 @@ class PilotJobScratchSpaceTestWMS : public wrench::WMS { int main() { - // Create a job manager - auto job_manager = this->createJobManager(); - - auto file_registry_service = this->getAvailableFileRegistryService(); - - // Create a pilot job that requires 1 host, 1 core per host, 1 bytes of RAM per host, and 1 hour - wrench::PilotJob *pilot_job = job_manager->createPilotJob(); - - // Submit a pilot job + // Create a job manager + auto job_manager = this->createJobManager(); + + auto file_registry_service = this->getAvailableFileRegistryService(); + + // Create a pilot job that requires 1 host, 1 core per host, 1 bytes of RAM per host, and 1 hour + wrench::PilotJob *pilot_job = job_manager->createPilotJob(); + + // Submit a pilot job + try { + job_manager->submitJob(pilot_job, this->test->compute_service, {{"-N","1"}, {"-c","1"}, {"-t","60"}}); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Unexpected exception: " + e.getCause()->toString()); + } + + // Wait for the pilot job start + std::shared_ptr event; + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error( + "Error while getting and execution event: " + e.getCause()->toString()); + } + + if (not std::dynamic_pointer_cast(event)) { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + } + + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 60, 1, 1, 1.0, 0); + task1->addInputFile(this->getWorkflow()->getFileByID("input_file1")); + + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 360, 1, 1, 1.0, 0); + task2->addInputFile(this->getWorkflow()->getFileByID("input_file2")); + + // Create a sequential task that lasts one min and requires 1 cores + wrench::WorkflowTask *task3 = this->getWorkflow()->addTask("task3", 600, 1, 1, 1.0, 0); + task3->addInputFile(this->getWorkflow()->getFileByID("input_file3")); + + // Create a StandardJob with SOME pre-copies from public storage to scratch + wrench::StandardJob *job1 = job_manager->createStandardJob( + {task1}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file1"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); + + // Create a StandardJob with SOME pre-copies from public storage to scratch + wrench::StandardJob *job2 = job_manager->createStandardJob( + {task2}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file2"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); + + // Create a StandardJob with SOME pre-copies from public storage to scratch + wrench::StandardJob *job3 = job_manager->createStandardJob( + {task3}, + {}, + {std::make_tuple(this->getWorkflow()->getFileByID("input_file3"), + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, + {}); + + + // Submit the standard job for execution + try { + job_manager->submitJob(job1, pilot_job->getComputeService()); + job_manager->submitJob(job2, pilot_job->getComputeService()); + job_manager->submitJob(job3, pilot_job->getComputeService()); + } catch (std::exception &e) { + throw std::runtime_error( + "Unexpected exception while submitting standard job to pilot job: " + std::string(e.what())); + } + + int i = 0; + while (i < 3) { + // Wait for the standard job completion try { - job_manager->submitJob(pilot_job, this->test->compute_service, {{"-N","1"}, {"-c","1"}, {"-t","60"}}); + event = this->getWorkflow()->waitForNextExecutionEvent(); } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Unexpected exception: " + e.getCause()->toString()); - } - - // Wait for the pilot job start - std::shared_ptr event; - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error( - "Error while getting and execution event: " + e.getCause()->toString()); - } - - if (not std::dynamic_pointer_cast(event)) { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } - - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 60, 1, 1, 1.0, 0); - task1->addInputFile(this->getWorkflow()->getFileByID("input_file1")); - - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 360, 1, 1, 1.0, 0); - task2->addInputFile(this->getWorkflow()->getFileByID("input_file2")); - - // Create a sequential task that lasts one min and requires 1 cores - wrench::WorkflowTask *task3 = this->getWorkflow()->addTask("task3", 600, 1, 1, 1.0, 0); - task3->addInputFile(this->getWorkflow()->getFileByID("input_file3")); - - // Create a StandardJob with SOME pre-copies from public storage to scratch - wrench::StandardJob *job1 = job_manager->createStandardJob( - {task1}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file1"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - // Create a StandardJob with SOME pre-copies from public storage to scratch - wrench::StandardJob *job2 = job_manager->createStandardJob( - {task2}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file2"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - // Create a StandardJob with SOME pre-copies from public storage to scratch - wrench::StandardJob *job3 = job_manager->createStandardJob( - {task3}, - {}, - {std::make_tuple(this->getWorkflow()->getFileByID("input_file3"), - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, - {}); - - - // Submit the standard job for execution - try { - job_manager->submitJob(job1, pilot_job->getComputeService()); - job_manager->submitJob(job2, pilot_job->getComputeService()); - job_manager->submitJob(job3, pilot_job->getComputeService()); - } catch (std::exception &e) { - throw std::runtime_error( - "Unexpected exception while submitting standard job to pilot job: " + std::string(e.what())); - } - - int i = 0; - while (i < 3) { - // Wait for the standard job completion - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error( - "Error while getting and execution event: " + e.getCause()->toString()); - } - if (std::dynamic_pointer_cast(event)) { - // success, check if the scratch space size is not full again or not, it should not be - double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); - if (free_space_size == 3000.0) { - throw std::runtime_error( - "Pilot Job is expected to clear its scratch space only after all the standard job finishes"); - } - } else { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } - i++; + throw std::runtime_error( + "Error while getting and execution event: " + e.getCause()->toString()); } - - // Wait for the pilot job expiration - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { + if (std::dynamic_pointer_cast(event)) { + // success, check if the scratch space size is not full again or not, it should not be + double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); + if (free_space_size == 3000.0) { throw std::runtime_error( - "Error while getting and execution event: " + e.getCause()->toString()); - } - if (std::dynamic_pointer_cast(event)) { - // success, check if the scratch space size is full again or not, it should be full - wrench::S4U_Simulation::sleep(10); //sleep for some time to ensure everything is deleted - double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); - if (free_space_size != 3000.0) { - throw std::runtime_error( - "Scratch space should be full after this pilot job expires but it is not now"); - } + "Pilot Job is expected to clear its scratch space only after all the standard job finishes"); + } } else { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); } + i++; + } + + // Wait for the pilot job expiration + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error( + "Error while getting and execution event: " + e.getCause()->toString()); + } + if (std::dynamic_pointer_cast(event)) { + // success, check if the scratch space size is full again or not, it should be full + wrench::S4U_Simulation::sleep(10); //sleep for some time to ensure everything is deleted + double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); + if (free_space_size != 3000.0) { + throw std::runtime_error( + "Scratch space should be empty after this pilot job expires but it is not now"); + } + } else { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); + } - return 0; + return 0; } }; TEST_F(ScratchSpaceTest, PilotJobScratchSpaceTest) { - DO_TEST_WITH_FORK(do_PilotJobScratchSpace_test); + DO_TEST_WITH_FORK(do_PilotJobScratchSpace_test); } void ScratchSpaceTest::do_PilotJobScratchSpace_test() { - // Create and initialize a simulation - auto simulation = new wrench::Simulation(); - int argc = 1; - auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("unit_test"); + // Create and initialize a simulation + auto simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); - ASSERT_NO_THROW(simulation->init(&argc, argv)); + ASSERT_NO_THROW(simulation->init(&argc, argv)); - // Setting up the platform - ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); - // Get a hostname - std::string hostname = simulation->getHostnameList()[0]; + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; - // Create a Storage Service - ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk1"}))); - // Create a Storage Service - ASSERT_NO_THROW(storage_service2 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}))); + // Create a Storage Service + ASSERT_NO_THROW(storage_service2 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk2"}))); - // Create a Compute Service that does not have scratch space - ASSERT_NO_THROW(compute_service = simulation->add( - new wrench::BatchComputeService(hostname, - {hostname}, "", {}))); + // Create a Compute Service that does not have scratch space + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::BatchComputeService(hostname, + {hostname}, "/scratch3000", {}))); - simulation->add(new wrench::FileRegistryService(hostname)); + simulation->add(new wrench::FileRegistryService(hostname)); - // Create a WMS - std::shared_ptr wms = nullptr;; - ASSERT_NO_THROW(wms = simulation->add( - new PilotJobScratchSpaceTestWMS( - this, {compute_service}, hostname))); + // Create a WMS + std::shared_ptr wms = nullptr;; + ASSERT_NO_THROW(wms = simulation->add( + new PilotJobScratchSpaceTestWMS( + this, {compute_service}, hostname))); - ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); + ASSERT_NO_THROW(wms->addWorkflow(std::move(workflow.get()))); - // Create two workflow files - wrench::WorkflowFile *input_file1 = this->workflow->addFile("input_file1", 1000.0); - wrench::WorkflowFile *input_file2 = this->workflow->addFile("input_file2", 1000.0); - wrench::WorkflowFile *input_file3 = this->workflow->addFile("input_file3", 1000.0); + // Create two workflow files + wrench::WorkflowFile *input_file1 = this->workflow->addFile("input_file1", 1000.0); + wrench::WorkflowFile *input_file2 = this->workflow->addFile("input_file2", 1000.0); + wrench::WorkflowFile *input_file3 = this->workflow->addFile("input_file3", 1000.0); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(input_file1, storage_service1)); - ASSERT_NO_THROW(simulation->stageFile(input_file2, storage_service1)); - ASSERT_NO_THROW(simulation->stageFile(input_file3, storage_service1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(input_file1, storage_service1)); + ASSERT_NO_THROW(simulation->stageFile(input_file2, storage_service1)); + ASSERT_NO_THROW(simulation->stageFile(input_file3, storage_service1)); - // Running a "run a single task" simulation - // Note that in these tests the WMS creates workflow tasks, which a user would - // of course not be likely to do - ASSERT_NO_THROW(simulation->launch()); + // Running a "run a single task" simulation + // Note that in these tests the WMS creates workflow tasks, which a user would + // of course not be likely to do + ASSERT_NO_THROW(simulation->launch()); - delete simulation; + delete simulation; - free(argv[0]); - free(argv); + free(argv[0]); + free(argv); } @@ -719,7 +730,7 @@ class ScratchSpaceRaceConditionTestWMS : public wrench::WMS { const std::set> &storage_services, std::string &hostname) : wrench::WMS(nullptr, nullptr, compute_services, storage_services, {}, nullptr, hostname, "test") { - this->test = test; + this->test = test; } private: @@ -727,122 +738,122 @@ class ScratchSpaceRaceConditionTestWMS : public wrench::WMS { ScratchSpaceTest *test; int main() { - // Create a data movement manager - auto data_movement_manager = this->createDataMovementManager(); + // Create a data movement manager + auto data_movement_manager = this->createDataMovementManager(); - // Create a job manager - auto job_manager = this->createJobManager(); + // Create a job manager + auto job_manager = this->createJobManager(); - // Get a reference to the file - wrench::WorkflowFile *file = this->getWorkflow()->getFileByID("input"); + // Get a reference to the file + wrench::WorkflowFile *file = this->getWorkflow()->getFileByID("input"); - // Create three tasks - wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 10, 1, 1, 1.0, 0); // 10 seconds - wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 10, 1, 1, 1.0, 0); // 10 seconds - this->getWorkflow()->addControlDependency(task1, task2); // task 1 depends on task2 - task2->addInputFile(file); + // Create three tasks + wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 10, 1, 1, 1.0, 0); // 10 seconds + wrench::WorkflowTask *task2 = this->getWorkflow()->addTask("task2", 10, 1, 1, 1.0, 0); // 10 seconds + this->getWorkflow()->addControlDependency(task1, task2); // task 1 depends on task2 + task2->addInputFile(file); - wrench::WorkflowTask *task3 = this->getWorkflow()->addTask("task3", 1, 1, 1, 1.0, 0); // 1 second + wrench::WorkflowTask *task3 = this->getWorkflow()->addTask("task3", 1, 1, 1, 1.0, 0); // 1 second - // Create a first job that: - // - copies file "input" to the scratch space - // - runs task1 and then task2 (10 second each) - // - (task 2 needs "input") - wrench::StandardJob *job1 = job_manager->createStandardJob( - {task1, task2}, {}, - {std::make_tuple(file, - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, {}); + // Create a first job that: + // - copies file "input" to the scratch space + // - runs task1 and then task2 (10 second each) + // - (task 2 needs "input") + wrench::StandardJob *job1 = job_manager->createStandardJob( + {task1, task2}, {}, + {std::make_tuple(file, + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, {}); - // Create a second job that: - // - copies file "input" to the scratch space - // - runs task3 (1 second) - wrench::StandardJob *job2 = job_manager->createStandardJob( - {task3}, {}, - {std::make_tuple(file, - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, {}); + // Create a second job that: + // - copies file "input" to the scratch space + // - runs task3 (1 second) + wrench::StandardJob *job2 = job_manager->createStandardJob( + {task3}, {}, + {std::make_tuple(file, + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, {}); - // Submit both jobs - job_manager->submitJob(job1, this->test->compute_service); - job_manager->submitJob(job2, this->test->compute_service); + // Submit both jobs + job_manager->submitJob(job1, this->test->compute_service); + job_manager->submitJob(job2, this->test->compute_service); - // Wait for workflow execution events - for (auto job : {job1, job2}) { - std::shared_ptr event; - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } - if (not std::dynamic_pointer_cast(event)) { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } + // Wait for workflow execution events + for (auto job : {job1, job2}) { + std::shared_ptr event; + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); + } + if (not std::dynamic_pointer_cast(event)) { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); } + } - return 0; + return 0; } }; TEST_F(ScratchSpaceTest, RaceConditionTest) { - DO_TEST_WITH_FORK(do_RaceConditionTest_test); + DO_TEST_WITH_FORK(do_RaceConditionTest_test); } void ScratchSpaceTest::do_RaceConditionTest_test() { - // Create and initialize a simulation - auto *simulation = new wrench::Simulation(); - int argc = 1; - auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("unit_test"); + // Create and initialize a simulation + auto *simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); - ASSERT_NO_THROW(simulation->init(&argc, argv)); + ASSERT_NO_THROW(simulation->init(&argc, argv)); - // Setting up the platform - ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); - // Get a hostname - std::string hostname = simulation->getHostnameList()[0]; + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; - // Create a Storage Service (note the BOGUS property, which is for testing puposes - // and doesn't matter because we do not stop the service) - ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}, - {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); + // Create a Storage Service (note the BOGUS property, which is for testing puposes + // and doesn't matter because we do not stop the service) + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk1"}, + {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); - // Create a Cloud Service - ASSERT_NO_THROW(compute_service = simulation->add( - new wrench::BareMetalComputeService(hostname, {"Host1"}, "/scratch100", {}, {}))); + // Create a Cloud Service + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::BareMetalComputeService(hostname, {"Host1"}, "/scratch3000", {}, {}))); - // Create a WMS - std::shared_ptr wms = nullptr;; - ASSERT_NO_THROW(wms = simulation->add( - new ScratchSpaceRaceConditionTestWMS(this, {compute_service}, {storage_service1}, hostname))); + // Create a WMS + std::shared_ptr wms = nullptr;; + ASSERT_NO_THROW(wms = simulation->add( + new ScratchSpaceRaceConditionTestWMS(this, {compute_service}, {storage_service1}, hostname))); // wrench::Workflow *workflow = new wrench::Workflow(); - ASSERT_NO_THROW(wms->addWorkflow(this->workflow.get())); + ASSERT_NO_THROW(wms->addWorkflow(this->workflow.get())); - // Create a file registry - ASSERT_NO_THROW(simulation->add(new wrench::FileRegistryService(hostname))); + // Create a file registry + ASSERT_NO_THROW(simulation->add(new wrench::FileRegistryService(hostname))); - // Create a file - wrench::WorkflowFile *file = nullptr; - ASSERT_NO_THROW(file = workflow->addFile("input", 1)); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(file, storage_service1)); + // Create a file + wrench::WorkflowFile *file = nullptr; + ASSERT_NO_THROW(file = workflow->addFile("input", 1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(file, storage_service1)); - // Running a "run a single task" simulation - ASSERT_NO_THROW(simulation->launch()); + // Running a "run a single task" simulation + ASSERT_NO_THROW(simulation->launch()); - delete simulation; - free(argv[0]); - free(argv); + delete simulation; + free(argv[0]); + free(argv); } @@ -858,7 +869,7 @@ class ScratchNonScratchPartitionsTestWMS : public wrench::WMS { const std::set> &storage_services, std::string &hostname) : wrench::WMS(nullptr, nullptr, compute_services, storage_services, {}, nullptr, hostname, "test") { - this->test = test; + this->test = test; } private: @@ -867,197 +878,206 @@ class ScratchNonScratchPartitionsTestWMS : public wrench::WMS { int main() { - //NonScratch have only / partition but other partitions can be created - //Scratch have /, / partitions - - // Create a data movement manager and this should only copy from / to / of two non scratch space - auto data_movement_manager = this->createDataMovementManager(); - - // Create a job manager - auto job_manager = this->createJobManager(); - - // Get a reference to the file - wrench::WorkflowFile *file1 = this->getWorkflow()->getFileByID("input1"); - // Get a reference to the file - wrench::WorkflowFile *file2 = this->getWorkflow()->getFileByID("input2"); - - //check if this file is staged in / partition of non-scratch - if(!test->storage_service1->lookupFile(file1, nullptr)) { //nullptr is referring to no job's partition - throw std::runtime_error( - "The file1 was supposed to be staged in / partition but is not" - ); - } - //check if this file is staged in / partition of non-scratch - if(!test->storage_service2->lookupFile(file2, nullptr)) { //nullptr is referring to no job's partition - throw std::runtime_error( - "The file2 was supposed to be staged in / partition but is not" - ); - } - - // Create a task - wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 10, 1, 1, 1.0, 0); // 10 seconds - task1->addInputFile(file1); - - // Create a first job that: - // - copies file "input" to the scratch space - // - runs task1 - wrench::StandardJob *job1 = job_manager->createStandardJob( - {task1}, {}, - {std::make_tuple(file1, - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::SCRATCH)}, - {}, {}); - - // Submit job1 - job_manager->submitJob(job1, this->test->compute_service); - - // Wait for workflow execution events - for (auto job : {job1}) { - std::shared_ptr event; - try { - event = this->getWorkflow()->waitForNextExecutionEvent(); - } catch (wrench::WorkflowExecutionException &e) { - throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); - } - if (not std::dynamic_pointer_cast(event)) { - throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); - } - } - - //the file1 should still be non-scratch space, the job should only delete file from it's scratch job's partition - //check if this file is staged in / partition of non-scratch - if(!test->storage_service1->lookupFile(file1, nullptr)) { //nullptr is referring to no job's partition - throw std::runtime_error( - "The file1 again was supposed to be staged in / partition but is not" - ); - } - - //try to copy file1 from job1's partition of storage service1 into storage service2 in / partition, this should fail - try { - wrench::StorageService::copyFile(file1, - wrench::FileLocation::LOCATION(this->test->storage_service1, - this->test->storage_service1->getMountPoint() + job1->getName()), - wrench::FileLocation::LOCATION(this->test->storage_service2)); - throw std::runtime_error( - "Non-scratch space have / partition unless created by copying something into a new partition name" - ); - } catch(wrench::WorkflowExecutionException) { - } - - //try to copy file1 from / partition of storage service1 into storage service2 in job1's partition, this should succeed - try { - wrench::StorageService::copyFile(file1, - wrench::FileLocation::LOCATION(this->test->storage_service1), - wrench::FileLocation::LOCATION(this->test->storage_service2, - this->test->storage_service1->getMountPoint() + job1->getName())); - - } catch(wrench::WorkflowExecutionException) { - throw std::runtime_error( - "We should have been able to copy from / partition of non-scratch to a new partition into another non-scratch space" - ); - } - - //try to copy file2 from / partition of storage service2 into storage service1 in / partition, it should succeed - try { - wrench::StorageService::copyFile(file2, - wrench::FileLocation::LOCATION(this->test->storage_service2), - wrench::FileLocation::LOCATION(this->test->storage_service1)); - - } catch(wrench::WorkflowExecutionException) { - throw std::runtime_error( - "We should have been able to copy from / of one non-scratch space to / of another non-scratch space" - ); - } - - //try to copy file2 from / partition of stroage service2 into storage service2 in /test partition, it should succeed + //NonScratch have only / partition but other partitions can be created + //Scratch have /, / partitions + + // Create a data movement manager and this should only copy from / to / of two non scratch space + auto data_movement_manager = this->createDataMovementManager(); + + // Create a job manager + auto job_manager = this->createJobManager(); + + // Get a reference to the file + wrench::WorkflowFile *file1 = this->getWorkflow()->getFileByID("input1"); + // Get a reference to the file + wrench::WorkflowFile *file2 = this->getWorkflow()->getFileByID("input2"); + + //check if this file is staged at mount point of non-scratch + if (not wrench::StorageService::lookupFile( + file1, + wrench::FileLocation::LOCATION(test->storage_service1))) { + throw std::runtime_error( + "The file1 was supposed to be staged at the mount point but is not" + ); + } + //check if this file is staged at mount point of non-scratch + if (not wrench::StorageService::lookupFile( + file2, + wrench::FileLocation::LOCATION(test->storage_service2))) { + throw std::runtime_error( + "The file2 was supposed to be staged in / partition but is not" + ); + } + + // Create a task + wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 10, 1, 1, 1.0, 0); // 10 seconds + task1->addInputFile(file1); + + // Create a first job that: + // - copies file "input" to the scratch space + // - runs task1 + wrench::StandardJob *job1 = job_manager->createStandardJob( + {task1}, {}, + {std::make_tuple(file1, + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::SCRATCH)}, + {}, {}); + + // Submit job1 + job_manager->submitJob(job1, this->test->compute_service); + + // Wait for workflow execution events + for (auto job : {job1}) { + std::shared_ptr event; try { - wrench::StorageService::copyFile(file2, - wrench::FileLocation::LOCATION(this->test->storage_service2), - wrench::FileLocation::LOCATION(this->test->storage_service1, - this->test->storage_service1->getMountPoint() + "/test")); - - }catch(wrench::WorkflowExecutionException) { - throw std::runtime_error( - "We should have been able to copy from one partition to another partition of the same storage service" - ); + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); } - - //we just copied file to /test partition of storage service2, so it must be there - if (!test->storage_service2->lookupFile(file2, wrench::FileLocation::LOCATION(this->test->storage_service2, - this->test->storage_service2->getMountPoint() + "/test"))) { - throw std::runtime_error( - "The file2 was supposed to be stored in /test partition but is not" - ); + if (not std::dynamic_pointer_cast(event)) { + throw std::runtime_error("Unexpected workflow execution event: " + event->toString()); } - - - return 0; + } + + //the file1 should still be non-scratch space, the job should only delete file from it's scratch job's partition + //check if this file is staged in mount point of non-scratch + if (not wrench::StorageService::lookupFile( + file1, + wrench::FileLocation::LOCATION(test->storage_service1))) { + throw std::runtime_error( + "The file1 again was supposed to be staged in / partition but is not" + ); + } + + //try to copy file1 from job1's partition of storage service1 into storage service2 in / partition, this should fail + try { + wrench::StorageService::copyFile(file1, + wrench::FileLocation::LOCATION(this->test->storage_service1, + this->test->storage_service1->getMountPoint() + job1->getName()), + wrench::FileLocation::LOCATION(this->test->storage_service2)); + throw std::runtime_error( + "Non-scratch space have / partition unless created by copying something into a new partition name" + ); + } catch(wrench::WorkflowExecutionException) { + } + + //try to copy file1 from / partition of storage service1 into storage service2 in job1's partition, this should succeed + try { + wrench::StorageService::copyFile(file1, + wrench::FileLocation::LOCATION(this->test->storage_service1), + wrench::FileLocation::LOCATION(this->test->storage_service2, + this->test->storage_service2->getMountPoint() + job1->getName())); + + } catch(wrench::WorkflowExecutionException) { + throw std::runtime_error( + "We should have been able to copy from / partition of non-scratch to a new partition into another non-scratch space" + ); + } + + //try to copy file2 from / partition of storage service2 into storage service1 at mount point, it should succeed + try { + wrench::StorageService::copyFile(file2, + wrench::FileLocation::LOCATION(this->test->storage_service2), + wrench::FileLocation::LOCATION(this->test->storage_service1)); + + } catch(wrench::WorkflowExecutionException) { + throw std::runtime_error( + "We should have been able to copy from / of one non-scratch space to / of another non-scratch space" + ); + } + + //try to copy file2 from / partition of storage service2 into storage service2 in /test directory, it should succeed + try { + wrench::StorageService::copyFile(file2, + wrench::FileLocation::LOCATION(this->test->storage_service2), + wrench::FileLocation::LOCATION(this->test->storage_service2, + this->test->storage_service2->getMountPoint() + "/test")); + + }catch(wrench::WorkflowExecutionException) { + throw std::runtime_error( + "We should have been able to copy from one partition to another partition of the same storage service" + ); + } + + //we just copied file to /test partition of storage service2, so it must be there + if (not wrench::StorageService::lookupFile( + file2, + wrench::FileLocation::LOCATION(this->test->storage_service2, + this->test->storage_service2->getMountPoint() + "/test"))) { + + throw std::runtime_error( + "The file2 was supposed to be stored in /test partition but is not" + ); + } + + + return 0; } }; TEST_F(ScratchSpaceTest, ScratchNonScratchPartitionsTest) { - DO_TEST_WITH_FORK(do_PartitionsTest_test); + DO_TEST_WITH_FORK(do_PartitionsTest_test); } void ScratchSpaceTest::do_PartitionsTest_test() { - // Create and initialize a simulation - auto *simulation = new wrench::Simulation(); - int argc = 1; - auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("unit_test"); + // Create and initialize a simulation + auto *simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("unit_test"); - ASSERT_NO_THROW(simulation->init(&argc, argv)); + ASSERT_NO_THROW(simulation->init(&argc, argv)); - // Setting up the platform - ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); - // Get a hostname - std::string hostname = simulation->getHostnameList()[0]; + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; - // Create a Storage Service (note the BOGUS property, which is for testing puposes - // and doesn't matter because we do not stop the service) - ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}, - {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); + // Create a Storage Service (note the BOGUS property, which is for testing puposes + // and doesn't matter because we do not stop the service) + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk1"}, + {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); - // Create a Storage Service (note the BOGUS property, which is for testing puposes - // and doesn't matter because we do not stop the service) - ASSERT_NO_THROW(storage_service2 = simulation->add( - new wrench::SimpleStorageService(hostname, {"/"}, - {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); + // Create a Storage Service (note the BOGUS property, which is for testing puposes + // and doesn't matter because we do not stop the service) + ASSERT_NO_THROW(storage_service2 = simulation->add( + new wrench::SimpleStorageService(hostname, {"/disk2"}, + {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); - // Create a Cloud Service - ASSERT_NO_THROW(compute_service = simulation->add( - new wrench::BareMetalComputeService(hostname, {"Host1"}, "/scratch", {}, {}))); + // Create a Cloud Service + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::BareMetalComputeService(hostname, {"Host1"}, "/scratch3000", {}, {}))); - // Create a WMS - std::shared_ptr wms = nullptr;; - ASSERT_NO_THROW(wms = simulation->add( - new ScratchNonScratchPartitionsTestWMS(this, {compute_service}, {storage_service1}, hostname))); + // Create a WMS + std::shared_ptr wms = nullptr;; + ASSERT_NO_THROW(wms = simulation->add( + new ScratchNonScratchPartitionsTestWMS(this, {compute_service}, {storage_service1}, hostname))); // auto workflow = new wrench::Workflow(); - ASSERT_NO_THROW(wms->addWorkflow(workflow.get())); - - // Create a file registry - ASSERT_NO_THROW(simulation->add(new wrench::FileRegistryService(hostname))); - - // Create a file - wrench::WorkflowFile *file1 = nullptr; - ASSERT_NO_THROW(file1 = workflow->addFile("input1", 1)); - // Create a file - wrench::WorkflowFile *file2 = nullptr; - ASSERT_NO_THROW(file2 = workflow->addFile("input2", 1)); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(file1, storage_service1)); - // Staging the input_file on the storage service - ASSERT_NO_THROW(simulation->stageFile(file2, storage_service2)); - - // Running a "run a single task" simulation - ASSERT_NO_THROW(simulation->launch()); - - delete simulation; - free(argv[0]); - free(argv); + ASSERT_NO_THROW(wms->addWorkflow(workflow.get())); + + // Create a file registry + ASSERT_NO_THROW(simulation->add(new wrench::FileRegistryService(hostname))); + + // Create a file + wrench::WorkflowFile *file1 = nullptr; + ASSERT_NO_THROW(file1 = workflow->addFile("input1", 1)); + // Create a file + wrench::WorkflowFile *file2 = nullptr; + ASSERT_NO_THROW(file2 = workflow->addFile("input2", 1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(file1, storage_service1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(file2, storage_service2)); + + // Running a "run a single task" simulation + ASSERT_NO_THROW(simulation->launch()); + + delete simulation; + free(argv[0]); + free(argv); }