Skip to content

Commit

Permalink
(#2) tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Oct 2, 2019
1 parent 4af7589 commit 531b6eb
Show file tree
Hide file tree
Showing 24 changed files with 273 additions and 109 deletions.
13 changes: 7 additions & 6 deletions include/wrench/services/compute/ComputeService.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ namespace wrench {
/***********************/

friend class StandardJobExecutorTest;
friend class Simulation;

/***********************/
/** \endcond **/
/***********************/

friend class Simulation;

public:

Expand Down Expand Up @@ -98,7 +98,6 @@ namespace wrench {

double getFreeScratchSpaceSize();

std::shared_ptr<StorageService> getScratch();

/***********************/
/** \endcond **/
Expand All @@ -109,18 +108,18 @@ namespace wrench {
/***********************/


/**
/**
* @brief Method to submit a standard job to the service
*
*
* @param job: The job being submitted
* @param service_specific_arguments: the set of service-specific arguments
*/
virtual void
submitStandardJob(StandardJob *job, std::map<std::string, std::string> &service_specific_arguments) = 0;

/**
/**
* @brief Method to submit a pilot job to the service
*
*
* @param job: The job being submitted
* @param service_specific_arguments: the set of service-specific arguments
*/
Expand All @@ -146,6 +145,8 @@ namespace wrench {

protected:

std::shared_ptr<StorageService> getScratch();

ComputeService(const std::string &hostname,
std::string service_name,
std::string mailbox_name_prefix,
Expand Down
2 changes: 2 additions & 0 deletions include/wrench/services/storage/simple/SimpleStorageService.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ namespace wrench {
std::shared_ptr<FileLocation> dst_location,
bool success,
std::shared_ptr<FailureCause> failure_cause,
std::string answer_mailbox_if_read,
std::string answer_mailbox_if_write,
std::string answer_mailbox_if_copy,
SimulationTimestampFileCopyStart *start_timestamp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace wrench {

public:

static std::shared_ptr<FileLocation> SCRATCH;
static std::shared_ptr<FileLocation> LOCATION(std::shared_ptr<StorageService> ss);

static std::shared_ptr<FileLocation> LOCATION(std::shared_ptr<StorageService> ss,
Expand All @@ -33,6 +34,7 @@ namespace wrench {
std::shared_ptr<StorageService> getStorageService();
std::string getMountPoint();
std::string getAbsolutePathAtMountPoint();
std::string getFullAbsolutePath();

std::string toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace wrench {
WorkflowFile *file,
std::string src_mailbox,
std::shared_ptr<FileLocation> dst_location,
std::string answer_mailbox_if_read,
std::string answer_mailbox_if_write,
std::string answer_mailbox_if_copy,
unsigned long buffer_size,
SimulationTimestampFileCopyStart *start_timestamp = nullptr);
Expand All @@ -43,6 +45,8 @@ namespace wrench {
WorkflowFile *file,
std::shared_ptr<FileLocation> src_location,
std::string dst_mailbox,
std::string answer_mailbox_if_read,
std::string answer_mailbox_if_write,
std::string answer_mailbox_if_copy,
unsigned long buffer_size,
SimulationTimestampFileCopyStart *start_timestamp = nullptr);
Expand All @@ -52,6 +56,8 @@ namespace wrench {
WorkflowFile *file,
std::shared_ptr<FileLocation> src_location,
std::shared_ptr<FileLocation> dsg_location,
std::string answer_mailbox_if_read,
std::string answer_mailbox_if_write,
std::string answer_mailbox_if_copy,
unsigned long buffer_size,
SimulationTimestampFileCopyStart *start_timestamp = nullptr);
Expand All @@ -73,6 +79,8 @@ namespace wrench {
std::string dst_mailbox;
std::shared_ptr<FileLocation> dst_location;

std::string answer_mailbox_if_read;
std::string answer_mailbox_if_write;
std::string answer_mailbox_if_copy;
unsigned long buffer_size;
SimulationTimestampFileCopyStart *start_timestamp;
Expand Down
2 changes: 1 addition & 1 deletion include/wrench/simgrid_S4U_util/S4U_Simulation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace wrench {
static void readFromDisk(double num_bytes, std::string hostname, std::string absolute_path);
static double getDiskCapacity(std::string hostname, std::string mount_point);
std::set<std::string> getDisks(std::string hostname);
static bool hostHasDisk(std::string hostname, std::string mount_point);
static bool hostHasMountPoint(std::string hostname, std::string mount_point);

static void yield();
static std::string getHostProperty(std::string hostname, std::string property_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace wrench {
* @brief Destructor
*/
BareMetalComputeService::~BareMetalComputeService() {
WRENCH_INFO("IN DESTRUCTOR");
this->default_property_values.clear();
}

Expand Down
2 changes: 1 addition & 1 deletion src/wrench/services/compute/htcondor/HTCondorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace wrench {
this->mailbox_name,
new ComputeServiceSubmitStandardJobRequestMessage(
answer_mailbox, job, service_specific_args,
this->getMessagePayloadValue(
this->SgetMessagePayloadValue(
HTCondorServiceMessagePayload::SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD)));
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ namespace wrench {
if (work->file_locations.find(f) != work->file_locations.end()) {
files_to_read[f] = work->file_locations[f];
} else {
if (this->scratch_space == nullptr) {
WRENCH_INFO("DUCK!!");
}
files_to_read[f] = FileLocation::LOCATION(this->scratch_space, job->getName());
this->files_stored_in_scratch.insert(f);
}
Expand Down
25 changes: 25 additions & 0 deletions src/wrench/services/storage/StorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,18 @@ namespace wrench {
file_content_message->getName() + "] message!");
}
}

//Waiting for the final ack
try {
message = S4U_Mailbox::getMessage(answer_mailbox, storage_service->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}
if (not std::dynamic_pointer_cast<StorageServiceAckMessage>(message)) {
throw std::runtime_error("StorageService::writeFile(): Received an unexpected [" +
message->getName() + "] message!");
}

}

} else {
Expand Down Expand Up @@ -360,8 +372,21 @@ namespace wrench {
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}

//Waiting for the final ack

try {
message = S4U_Mailbox::getMessage(answer_mailbox, storage_service->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}
if (not std::dynamic_pointer_cast<StorageServiceAckMessage>(message)) {
throw std::runtime_error("StorageService::writeFile(): Received an unexpected [" +
message->getName() + "] message!");
}
}


} else {
throw std::runtime_error("StorageService::writeFile(): Received an unexpected [" +
message->getName() + "] message!");
Expand Down
4 changes: 3 additions & 1 deletion src/wrench/services/storage/StorageServiceMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ namespace wrench {
std::string data_write_mailbox_name,
double payload) : StorageServiceMessage(
"FILE_WRITE_ANSWER", payload) {
if ((file == nullptr) || (location == nullptr) || (data_write_mailbox_name == "") ||
if ((file == nullptr) || (location == nullptr) ||
(success && (data_write_mailbox_name.empty())) ||
(!success && (!data_write_mailbox_name.empty())) ||
(success && (failure_cause != nullptr)) || (!success && (failure_cause == nullptr))) {
throw std::invalid_argument(
"StorageServiceFileWriteAnswerMessage::StorageServiceFileWriteAnswerMessage(): Invalid arguments");
Expand Down
12 changes: 12 additions & 0 deletions src/wrench/services/storage/StorageServiceMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ namespace wrench {

};



/**
* @brief A message sent to a StorageService to write a file
*/
Expand Down Expand Up @@ -286,6 +288,16 @@ namespace wrench {
bool last_chunk;
};


/**
* @brief A message sent by a StorageService as an ack
*/
class StorageServiceAckMessage : public StorageServiceMessage {
public:
StorageServiceAckMessage() : StorageServiceMessage("ACK",0) {}
};


/***********************/
/** \endcond */
/***********************/
Expand Down
39 changes: 31 additions & 8 deletions src/wrench/services/storage/simple/SimpleStorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ namespace wrench {
for (auto const &fs : this->file_systems) {
message += "\n - " + fs.first + ":" + std::to_string(fs.second->getTotalCapacity()/(1000*1000*1000)) + "GB";
}
WRENCH_INFO("%s", message.c_str());


/** Main loop **/
Expand Down Expand Up @@ -205,6 +204,7 @@ namespace wrench {
msg->answer_mailbox, msg->start_timestamp);

} else if (auto msg = std::dynamic_pointer_cast<FileTransferThreadNotificationMessage>(message)) {

return processFileTransferThreadNotification(
msg->file_transfer_thread,
msg->file,
Expand All @@ -214,6 +214,8 @@ namespace wrench {
msg->dst_location,
msg->success,
msg->failure_cause,
msg->answer_mailbox_if_read,
msg->answer_mailbox_if_write,
msg->answer_mailbox_if_copy,
msg->start_time_stamp);
} else {
Expand Down Expand Up @@ -266,14 +268,15 @@ namespace wrench {
}

if (failure_cause == nullptr) {

auto fs = this->file_systems[location->getMountPoint()].get();

if (not fs->doesDirectoryExist(location->getAbsolutePathAtMountPoint())) {
fs->createDirectory(location->getAbsolutePathAtMountPoint());
}

// Update occupied space, in advance (will have to be decreased later in case of failure)
fs->decreaseFreeSpace(file->getSize());
fs->reserveSpace(file, location->getAbsolutePathAtMountPoint());

// Generate a mailbox_name name on which to receive the file
std::string file_reception_mailbox = S4U_Mailbox::generateUniqueMailboxName("file_reception");
Expand All @@ -297,6 +300,8 @@ namespace wrench {
file_reception_mailbox,
location,
"",
answer_mailbox,
"",
buffer_size));
ftt->simulation = this->simulation;

Expand Down Expand Up @@ -380,6 +385,8 @@ namespace wrench {
file,
location,
mailbox_to_receive_the_file_content,
answer_mailbox,
"",
"",
buffer_size));
ftt->simulation = this->simulation;
Expand Down Expand Up @@ -469,7 +476,7 @@ namespace wrench {
}
return true;
}
fs->decreaseFreeSpace(file->getSize());
fs->reserveSpace(file, dst_location->getAbsolutePathAtMountPoint());
}


Expand All @@ -484,6 +491,8 @@ namespace wrench {
file,
src_location,
dst_location,
"",
"",
answer_mailbox,
this->buffer_size, start_timestamp));
ftt->simulation = this->simulation;
Expand Down Expand Up @@ -517,6 +526,8 @@ namespace wrench {
* @param dst_location: the transfer's destination location (or nullptr if destination was not a location)
* @param success: whether the transfer succeeded or not
* @param failure_cause: the failure cause (nullptr if success)
* @param answer_mailbox_if_read: the mailbox to send a read notification ("" if not a copy)
* @param answer_mailbox_if_write: the mailbox to send a write notification ("" if not a copy)
* @param answer_mailbox_if_copy: the mailbox to send a copy notification ("" if not a copy)
* @param start_timestamp: a start file copy time stamp
* @return false if the daemon should terminate
Expand All @@ -529,6 +540,8 @@ namespace wrench {
std::shared_ptr<FileLocation> dst_location,
bool success,
std::shared_ptr<FailureCause> failure_cause,
std::string answer_mailbox_if_read,
std::string answer_mailbox_if_write,
std::string answer_mailbox_if_copy,
SimulationTimestampFileCopyStart *start_timestamp) {

Expand All @@ -539,7 +552,6 @@ namespace wrench {
this->running_file_transfer_threads.erase(ftt);
}


// Was the destination me?
if (dst_location and (dst_location->getStorageService().get() == this)) {
if (success) {
Expand All @@ -552,11 +564,23 @@ namespace wrench {
}
} else {
// Process the failure, meaning, just un-decrease the free space
this->file_systems[dst_location->getAbsolutePathAtMountPoint()]->increaseFreeSpace(file->getSize());
this->file_systems[dst_location->getMountPoint()]->unreserveSpace(file, dst_location->getAbsolutePathAtMountPoint());
}
}

// Send back the corresponding ack if this was a copy
// Send back the relevant ack if this was a read
if (not answer_mailbox_if_read.empty()) {
WRENCH_INFO("Sending back an ack since this was a file read and some client is waiting for me to say something");
S4U_Mailbox::dputMessage(answer_mailbox_if_read, new StorageServiceAckMessage());
}

// Send back the relevant ack if this was a write
if (not answer_mailbox_if_write.empty()) {
WRENCH_INFO("Sending back an ack since this was a file write and some client is waiting for me to say something");
S4U_Mailbox::dputMessage(answer_mailbox_if_write, new StorageServiceAckMessage());
}

// Send back the relevant ack if this was a copy
if (not answer_mailbox_if_copy.empty()) {
WRENCH_INFO("Sending back an ack since this was a file copy and some client is waiting for me to say something");
if ((src_location == nullptr) or (dst_location == nullptr)) {
Expand All @@ -575,7 +599,6 @@ namespace wrench {
SimpleStorageServiceMessagePayload::FILE_COPY_ANSWER_MESSAGE_PAYLOAD)));
}


return true;
}

Expand All @@ -592,7 +615,7 @@ namespace wrench {
std::string answer_mailbox) {
std::shared_ptr<FailureCause> failure_cause = nullptr;

auto fs = this->file_systems[location->getAbsolutePathAtMountPoint()].get();
auto fs = this->file_systems[location->getMountPoint()].get();

if ((not fs->doesDirectoryExist(location->getAbsolutePathAtMountPoint())) or
(not fs->isFileInDirectory(file, location->getAbsolutePathAtMountPoint()))) {
Expand Down
Loading

0 comments on commit 531b6eb

Please sign in to comment.