Skip to content

Commit

Permalink
Develop aws (#29)
Browse files Browse the repository at this point in the history
* stuck on globbing

* efs works, no callcaching

* update readme

* extended EFS support

* fix for globbing in nested scatters

* updated config for globbing, to prevent issues with empty folders
  • Loading branch information
geertvandeweyer authored May 9, 2023
1 parent 8982e07 commit a45a323
Show file tree
Hide file tree
Showing 18 changed files with 479 additions and 90 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# common scala config
*~
.metals/
.DS_Store
.artifactory
.bsp
Expand Down Expand Up @@ -50,3 +51,5 @@ tesk_application.conf
**/venv/
exome_germline_single_sample_v1.3/
**/*.pyc
.scalafmt.conf
.vscode/
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
data.commandsToWaitFor.flatten.headOption match {
case Some(command: IoCopyCommand) =>
logCacheHitCopyCommand(command)
case Some(command: IoTouchCommand) =>
logCacheHitTouchCommand(command)
case huh =>
log.warning(s"BT-322 {} unexpected commandsToWaitFor: {}", jobTag, huh)
}
Expand Down Expand Up @@ -307,6 +309,9 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
case _ =>
}

private def logCacheHitTouchCommand(command: IoTouchCommand): Unit =
log.info(s"BT-322 {} cache hit for file : {}", jobTag, command.toString)

def succeedAndStop(returnCode: Option[Int], copiedJobOutputs: CallOutputs, detritusMap: DetritusMap): State = {
import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter
serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), startMetadataKeyValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
*/
package cromwell.filesystems.s3.batch

import cromwell.core.io.{IoCommandBuilder, IoContentAsStringCommand, IoIsDirectoryCommand, IoReadLinesCommand, IoWriteCommand, PartialIoCommandBuilder}
import cromwell.core.io.{IoCommandBuilder, IoHashCommand, IoContentAsStringCommand, IoIsDirectoryCommand, IoReadLinesCommand, IoWriteCommand, PartialIoCommandBuilder}
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path
import cromwell.filesystems.s3.S3Path
import org.slf4j.{Logger, LoggerFactory}
import cromwell.core.io.DefaultIoCommand.DefaultIoHashCommand

import scala.util.Try

Expand Down Expand Up @@ -77,8 +78,9 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder
case (src: S3Path, dest: S3Path) => Try(S3BatchCopyCommand(src, dest))
}

override def hashCommand: PartialFunction[Path, Try[S3BatchEtagCommand]] = {
case path: S3Path => Try(S3BatchEtagCommand(path))
override def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = {
case s3_path: S3Path => Try(S3BatchEtagCommand(s3_path).asInstanceOf[IoHashCommand])
case local_path: Path => Try(DefaultIoHashCommand(local_path))
}

override def touchCommand: PartialFunction[Path, Try[S3BatchTouchCommand]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ package cromwell.backend.impl.aws

import java.net.SocketTimeoutException
import java.io.FileNotFoundException
import java.nio.file.Paths

import akka.actor.ActorRef
import akka.pattern.AskSupport
Expand All @@ -46,11 +47,12 @@ import common.util.StringUtil._
import common.validation.Validation._

import cromwell.backend._
import cromwell.backend.async._ //{ExecutionHandle, PendingExecutionHandle}
import cromwell.backend.async._
import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest
import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus}
import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus}
import cromwell.backend.impl.aws.io._

import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
Expand Down Expand Up @@ -80,6 +82,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
import scala.util.{Success, Try, Failure}

/**
* The `AwsBatchAsyncBackendJobExecutionActor` creates and manages a job. The job itself is encapsulated by the
* functionality in `AwsBatchJob`
Expand Down Expand Up @@ -175,6 +178,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
* commandScriptContents here
*/

/* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing.
we'll see how much we need...
*/

lazy val cmdScript = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.toOption.get
case _ => execScript
Expand Down Expand Up @@ -239,6 +246,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk))
}

}

/**
Expand All @@ -250,12 +258,17 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
override protected def relativeLocalizationPath(file: WomFile): WomFile = {
file.mapFile(value =>
getPath(value) match {
case Success(path) =>
// for s3 paths :
case Success(path: S3Path) =>
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => path.pathWithoutScheme
case _ => path.toString
case AWSBatchStorageSystems.s3 =>
path.pathWithoutScheme
case _ =>
path.toString
}
case _ => value
// non-s3 paths
case _ =>
value
}
)
}
Expand Down Expand Up @@ -287,11 +300,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case womFile: WomFile => womFile
}
}

val callInputInputs = callInputFiles flatMap {
case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true)
}

// this is a list : AwsBatchInput(name_in_wf, origin_such_as_s3, target_in_docker_relative, target_in_docker_disk[name mount] )
val scriptInput: AwsBatchInput = AwsBatchFileInput(
"script",
jobPaths.script.pathAsString,
Expand All @@ -313,19 +326,14 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
def getAbsolutePath(path: Path) = {
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => AwsBatchWorkingDisk.MountPoint.resolve(path)
// case _ => DefaultPathBuilder.get(configuration.root).resolve(path)
case _ =>
Log.info("non-s3 path detected")
Log.info(path.toString)
AwsBatchWorkingDisk.MountPoint.resolve(path)
case _ => AwsBatchWorkingDisk.MountPoint.resolve(path)
}
}

val absolutePath = DefaultPathBuilder.get(path) match {
case p if !p.isAbsolute => getAbsolutePath(p)
case p => p
}

disks.find(d => absolutePath.startsWith(d.mountPoint)) match {
case Some(disk) => (disk.mountPoint.relativize(absolutePath), disk)
case None =>
Expand All @@ -350,10 +358,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
).getOrElse(List.empty[WomFile].validNel)
.getOrElse(List.empty)
}

val womFileOutputs = jobDescriptor.taskCall.callable.outputs.flatMap(evaluateFiles) map relativeLocalizationPath
Log.debug("WomFileOutputs:")
Log.debug(womFileOutputs.toString())
val outputs: Seq[AwsBatchFileOutput] = womFileOutputs.distinct flatMap {
_.flattenFiles flatMap {
case unlistedDirectory: WomUnlistedDirectory => generateUnlistedDirectoryOutputs(unlistedDirectory)
Expand All @@ -363,17 +368,17 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}

val additionalGlobOutput = jobDescriptor.taskCall.callable.additionalGlob.toList.flatMap(generateAwsBatchGlobFileOutputs).toSet

outputs.toSet ++ additionalGlobOutput
}


// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateUnlistedDirectoryOutputs(womFile: WomUnlistedDirectory): List[AwsBatchFileOutput] = {
val directoryPath = womFile.value.ensureSlashed
val directoryListFile = womFile.value.ensureUnslashed + ".list"
val dirDestinationPath = callRootPath.resolve(directoryPath).pathAsString
val listDestinationPath = callRootPath.resolve(directoryListFile).pathAsString

val (_, directoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)

// We need both the collection directory and the collection list:
Expand All @@ -397,6 +402,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateAwsBatchSingleFileOutputs(womFile: WomSingleFile): List[AwsBatchFileOutput] = {
// rewrite this to create more flexibility
//
val destination = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString
case _ => DefaultPathBuilder.get(womFile.valueString) match {
Expand All @@ -406,26 +413,65 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

}
val (relpath, disk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)
val output = AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk)

val output = if (configuration.efsMntPoint.isDefined &&
configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) &&
! runtimeAttributes.efsDelocalize) {
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), makeSafeAwsBatchReferenceName(womFile.value), relpath, disk)
} else {
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk)
}
List(output)
}

// get a unique glob name locations & paths.
// 1. globName :md5 hash of local PATH and WF_ID
// 2. globbedDir : local path of the directory being globbed.
// 3. volume on which the globbed data is located (eg root, efs, ...)
// 4. target path for delocalization for globDir
// 5. target path for delocalization for globList
private def generateGlobPaths(womFile: WomGlobFile): (String, String, AwsBatchVolume,String, String) = {
// add workflow id to hash for better conflict prevention
val wfid = standardParams.jobDescriptor.toString.split(":")(0)
val globName = GlobFunctions.globName(s"${womFile.value}-${wfid}")
val globbedDir = Paths.get(womFile.value).getParent.toString
// generalize folder and list file
val globDirectory = DefaultPathBuilder.get(globbedDir + "/." + globName + "/")
val globListFile = DefaultPathBuilder.get(globbedDir + "/." + globName + ".list")

// locate the disk where the globbed data resides
val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)


val (globDirectoryDestinationPath, globListFileDestinationPath) = if (configuration.efsMntPoint.isDefined &&
configuration.efsMntPoint.getOrElse("").equals(globDirectoryDisk.toString.split(" ")(1)) &&
! runtimeAttributes.efsDelocalize) {
(globDirectory, globListFile)
} else {
(callRootPath.resolve(globDirectory).pathAsString, callRootPath.resolve(globListFile).pathAsString)
}
// return results
return (
globName,
globbedDir,
globDirectoryDisk,
globDirectoryDestinationPath.toString,
globListFileDestinationPath.toString
)

}
// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateAwsBatchGlobFileOutputs(womFile: WomGlobFile): List[AwsBatchFileOutput] = {
val globName = GlobFunctions.globName(womFile.value)
val globDirectory = globName + "/"
val globListFile = globName + ".list"
val globDirectoryDestinationPath = callRootPath.resolve(globDirectory).pathAsString
val globListFileDestinationPath = callRootPath.resolve(globListFile).pathAsString

val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)


val (globName, globbedDir, globDirectoryDisk, globDirectoryDestinationPath, globListFileDestinationPath) = generateGlobPaths(womFile)
val (relpathDir,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + "/" + "*").toString,runtimeAttributes.disks)
val (relpathList,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + ".list").toString,runtimeAttributes.disks)
// We need both the glob directory and the glob list:
List(
// The glob directory:
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(globDirectory), globDirectoryDestinationPath, DefaultPathBuilder.get(globDirectory + "*"), globDirectoryDisk),
// The glob directory:.
AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString,globDirectoryDestinationPath, relpathDir, globDirectoryDisk),
// The glob list file:
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(globListFile), globListFileDestinationPath, DefaultPathBuilder.get(globListFile), globDirectoryDisk)
AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString, globListFileDestinationPath, relpathList, globDirectoryDisk)
)
}

Expand Down Expand Up @@ -805,4 +851,57 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
}



// overrides for globbing
/**
* Returns the shell scripting for linking a glob results file.
*
* @param globFile The glob.
* @return The shell scripting.
*/
override def globScript(globFile: WomGlobFile): String = {

val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile)
val controlFileName = "cromwell_glob_control_file"
val absoluteGlobValue = commandDirectory.resolve(globFile.value).pathAsString
val globDirectory = globbedDir + "/." + globName + "/"
val globList = globbedDir + "/." + globName + ".list"
val globLinkCommand: String = (if (configuration.globLinkCommand.isDefined) {
"( " + configuration.globLinkCommand.getOrElse("").toString + " )"

} else {
"( ln -L GLOB_PATTERN GLOB_DIRECTORY 2> /dev/null ) || ( ln GLOB_PATTERN GLOB_DIRECTORY )"
}).toString
.replaceAll("GLOB_PATTERN", absoluteGlobValue)
.replaceAll("GLOB_DIRECTORY", globDirectory)
// if on EFS : remove the globbing dir first, to remove leftover links from previous globs.
val mkDirCmd : String = if (configuration.efsMntPoint.isDefined && globDirectory.startsWith(configuration.efsMntPoint.getOrElse(""))) {
jobLogger.warn("Globbing on EFS has risks.")
jobLogger.warn(s"The globbing target (${globbedDir}/.${globName}/) will be overwritten when existing!")
jobLogger.warn("Consider keeping globbed outputs in the cromwell-root folder")
s"rm -Rf $globDirectory $globList && mkdir"
} else {
"mkdir"
}

val controlFileContent =
"""This file is used by Cromwell to allow for globs that would not match any file.
|By its presence it works around the limitation of some backends that do not allow empty globs.
|Regardless of the outcome of the glob, this file will not be part of the final list of globbed files.
""".stripMargin

s"""|# make the directory which will keep the matching files
|$mkDirCmd $globDirectory
|
|# create the glob control file that will allow for the globbing to succeed even if there is 0 match
|echo "${controlFileContent.trim}" > $globDirectory/$controlFileName
|
|# hardlink or symlink all the files into the glob directory
|$globLinkCommand
|
|# list all the files (except the control file) that match the glob into a file called glob-[md5 of glob].list
|ls -1 $globDirectory | grep -v $controlFileName > $globList
|""".stripMargin
}
}
Loading

0 comments on commit a45a323

Please sign in to comment.