diff --git a/.gitignore b/.gitignore index 571a12c5873..c7d6e562544 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # common scala config *~ +.metals/ .DS_Store .artifactory .bsp @@ -50,3 +51,5 @@ tesk_application.conf **/venv/ exome_germline_single_sample_v1.3/ **/*.pyc +.scalafmt.conf +.vscode/ diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala index f1662db858b..e23ecd277af 100644 --- a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala @@ -228,6 +228,8 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit data.commandsToWaitFor.flatten.headOption match { case Some(command: IoCopyCommand) => logCacheHitCopyCommand(command) + case Some(command: IoTouchCommand) => + logCacheHitTouchCommand(command) case huh => log.warning(s"BT-322 {} unexpected commandsToWaitFor: {}", jobTag, huh) } @@ -307,6 +309,9 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit case _ => } + private def logCacheHitTouchCommand(command: IoTouchCommand): Unit = + log.info(s"BT-322 {} cache hit for file : {}", jobTag, command.toString) + def succeedAndStop(returnCode: Option[Int], copiedJobOutputs: CallOutputs, detritusMap: DetritusMap): State = { import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), startMetadataKeyValues) diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala index 645600d1613..306f39337dd 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala @@ -30,11 +30,12 @@ */ package cromwell.filesystems.s3.batch -import cromwell.core.io.{IoCommandBuilder, IoContentAsStringCommand, IoIsDirectoryCommand, IoReadLinesCommand, IoWriteCommand, PartialIoCommandBuilder} +import cromwell.core.io.{IoCommandBuilder, IoHashCommand, IoContentAsStringCommand, IoIsDirectoryCommand, IoReadLinesCommand, IoWriteCommand, PartialIoCommandBuilder} import cromwell.core.path.BetterFileMethods.OpenOptions import cromwell.core.path.Path import cromwell.filesystems.s3.S3Path import org.slf4j.{Logger, LoggerFactory} +import cromwell.core.io.DefaultIoCommand.DefaultIoHashCommand import scala.util.Try @@ -77,8 +78,9 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder case (src: S3Path, dest: S3Path) => Try(S3BatchCopyCommand(src, dest)) } - override def hashCommand: PartialFunction[Path, Try[S3BatchEtagCommand]] = { - case path: S3Path => Try(S3BatchEtagCommand(path)) + override def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = { + case s3_path: S3Path => Try(S3BatchEtagCommand(s3_path).asInstanceOf[IoHashCommand]) + case local_path: Path => Try(DefaultIoHashCommand(local_path)) } override def touchCommand: PartialFunction[Path, Try[S3BatchTouchCommand]] = { diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 8d2464a8ee3..87ef19e80cb 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -33,6 +33,7 @@ package cromwell.backend.impl.aws import java.net.SocketTimeoutException import java.io.FileNotFoundException +import java.nio.file.Paths import akka.actor.ActorRef import akka.pattern.AskSupport @@ -46,11 +47,12 @@ import common.util.StringUtil._ import common.validation.Validation._ import cromwell.backend._ -import cromwell.backend.async._ //{ExecutionHandle, PendingExecutionHandle} +import cromwell.backend.async._ import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus} import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus} import cromwell.backend.impl.aws.io._ + import cromwell.backend.io.DirectoryFunctions import cromwell.backend.io.JobPaths import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob} @@ -80,6 +82,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace import scala.util.{Success, Try, Failure} + /** * The `AwsBatchAsyncBackendJobExecutionActor` creates and manages a job. The job itself is encapsulated by the * functionality in `AwsBatchJob` @@ -175,6 +178,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar * commandScriptContents here */ + /* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing. + we'll see how much we need... + */ + lazy val cmdScript = configuration.fileSystem match { case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.toOption.get case _ => execScript @@ -239,6 +246,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk)) } + } /** @@ -250,12 +258,17 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override protected def relativeLocalizationPath(file: WomFile): WomFile = { file.mapFile(value => getPath(value) match { - case Success(path) => + // for s3 paths : + case Success(path: S3Path) => configuration.fileSystem match { - case AWSBatchStorageSystems.s3 => path.pathWithoutScheme - case _ => path.toString + case AWSBatchStorageSystems.s3 => + path.pathWithoutScheme + case _ => + path.toString } - case _ => value + // non-s3 paths + case _ => + value } ) } @@ -287,11 +300,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case womFile: WomFile => womFile } } - + val callInputInputs = callInputFiles flatMap { case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true) } - + // this is a list : AwsBatchInput(name_in_wf, origin_such_as_s3, target_in_docker_relative, target_in_docker_disk[name mount] ) val scriptInput: AwsBatchInput = AwsBatchFileInput( "script", jobPaths.script.pathAsString, @@ -313,11 +326,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar def getAbsolutePath(path: Path) = { configuration.fileSystem match { case AWSBatchStorageSystems.s3 => AwsBatchWorkingDisk.MountPoint.resolve(path) - // case _ => DefaultPathBuilder.get(configuration.root).resolve(path) - case _ => - Log.info("non-s3 path detected") - Log.info(path.toString) - AwsBatchWorkingDisk.MountPoint.resolve(path) + case _ => AwsBatchWorkingDisk.MountPoint.resolve(path) } } @@ -325,7 +334,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case p if !p.isAbsolute => getAbsolutePath(p) case p => p } - disks.find(d => absolutePath.startsWith(d.mountPoint)) match { case Some(disk) => (disk.mountPoint.relativize(absolutePath), disk) case None => @@ -350,10 +358,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar ).getOrElse(List.empty[WomFile].validNel) .getOrElse(List.empty) } - val womFileOutputs = jobDescriptor.taskCall.callable.outputs.flatMap(evaluateFiles) map relativeLocalizationPath - Log.debug("WomFileOutputs:") - Log.debug(womFileOutputs.toString()) val outputs: Seq[AwsBatchFileOutput] = womFileOutputs.distinct flatMap { _.flattenFiles flatMap { case unlistedDirectory: WomUnlistedDirectory => generateUnlistedDirectoryOutputs(unlistedDirectory) @@ -363,17 +368,17 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val additionalGlobOutput = jobDescriptor.taskCall.callable.additionalGlob.toList.flatMap(generateAwsBatchGlobFileOutputs).toSet - + outputs.toSet ++ additionalGlobOutput } + // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateUnlistedDirectoryOutputs(womFile: WomUnlistedDirectory): List[AwsBatchFileOutput] = { val directoryPath = womFile.value.ensureSlashed val directoryListFile = womFile.value.ensureUnslashed + ".list" val dirDestinationPath = callRootPath.resolve(directoryPath).pathAsString val listDestinationPath = callRootPath.resolve(directoryListFile).pathAsString - val (_, directoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) // We need both the collection directory and the collection list: @@ -397,6 +402,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateAwsBatchSingleFileOutputs(womFile: WomSingleFile): List[AwsBatchFileOutput] = { + // rewrite this to create more flexibility + // val destination = configuration.fileSystem match { case AWSBatchStorageSystems.s3 => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString case _ => DefaultPathBuilder.get(womFile.valueString) match { @@ -406,26 +413,65 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val (relpath, disk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) - val output = AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk) + + val output = if (configuration.efsMntPoint.isDefined && + configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) && + ! runtimeAttributes.efsDelocalize) { + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), makeSafeAwsBatchReferenceName(womFile.value), relpath, disk) + } else { + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk) + } List(output) } + + // get a unique glob name locations & paths. + // 1. globName :md5 hash of local PATH and WF_ID + // 2. globbedDir : local path of the directory being globbed. + // 3. volume on which the globbed data is located (eg root, efs, ...) + // 4. target path for delocalization for globDir + // 5. target path for delocalization for globList + private def generateGlobPaths(womFile: WomGlobFile): (String, String, AwsBatchVolume,String, String) = { + // add workflow id to hash for better conflict prevention + val wfid = standardParams.jobDescriptor.toString.split(":")(0) + val globName = GlobFunctions.globName(s"${womFile.value}-${wfid}") + val globbedDir = Paths.get(womFile.value).getParent.toString + // generalize folder and list file + val globDirectory = DefaultPathBuilder.get(globbedDir + "/." + globName + "/") + val globListFile = DefaultPathBuilder.get(globbedDir + "/." + globName + ".list") + + // locate the disk where the globbed data resides + val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) + + val (globDirectoryDestinationPath, globListFileDestinationPath) = if (configuration.efsMntPoint.isDefined && + configuration.efsMntPoint.getOrElse("").equals(globDirectoryDisk.toString.split(" ")(1)) && + ! runtimeAttributes.efsDelocalize) { + (globDirectory, globListFile) + } else { + (callRootPath.resolve(globDirectory).pathAsString, callRootPath.resolve(globListFile).pathAsString) + } + // return results + return ( + globName, + globbedDir, + globDirectoryDisk, + globDirectoryDestinationPath.toString, + globListFileDestinationPath.toString + ) + + } // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateAwsBatchGlobFileOutputs(womFile: WomGlobFile): List[AwsBatchFileOutput] = { - val globName = GlobFunctions.globName(womFile.value) - val globDirectory = globName + "/" - val globListFile = globName + ".list" - val globDirectoryDestinationPath = callRootPath.resolve(globDirectory).pathAsString - val globListFileDestinationPath = callRootPath.resolve(globListFile).pathAsString - - val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) - + + val (globName, globbedDir, globDirectoryDisk, globDirectoryDestinationPath, globListFileDestinationPath) = generateGlobPaths(womFile) + val (relpathDir,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + "/" + "*").toString,runtimeAttributes.disks) + val (relpathList,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + ".list").toString,runtimeAttributes.disks) // We need both the glob directory and the glob list: List( - // The glob directory: - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(globDirectory), globDirectoryDestinationPath, DefaultPathBuilder.get(globDirectory + "*"), globDirectoryDisk), + // The glob directory:. + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString,globDirectoryDestinationPath, relpathDir, globDirectoryDisk), // The glob list file: - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(globListFile), globListFileDestinationPath, DefaultPathBuilder.get(globListFile), globDirectoryDisk) + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString, globListFileDestinationPath, relpathList, globDirectoryDisk) ) } @@ -805,4 +851,57 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } } + + + // overrides for globbing + /** + * Returns the shell scripting for linking a glob results file. + * + * @param globFile The glob. + * @return The shell scripting. + */ + override def globScript(globFile: WomGlobFile): String = { + + val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile) + val controlFileName = "cromwell_glob_control_file" + val absoluteGlobValue = commandDirectory.resolve(globFile.value).pathAsString + val globDirectory = globbedDir + "/." + globName + "/" + val globList = globbedDir + "/." + globName + ".list" + val globLinkCommand: String = (if (configuration.globLinkCommand.isDefined) { + "( " + configuration.globLinkCommand.getOrElse("").toString + " )" + + } else { + "( ln -L GLOB_PATTERN GLOB_DIRECTORY 2> /dev/null ) || ( ln GLOB_PATTERN GLOB_DIRECTORY )" + }).toString + .replaceAll("GLOB_PATTERN", absoluteGlobValue) + .replaceAll("GLOB_DIRECTORY", globDirectory) + // if on EFS : remove the globbing dir first, to remove leftover links from previous globs. + val mkDirCmd : String = if (configuration.efsMntPoint.isDefined && globDirectory.startsWith(configuration.efsMntPoint.getOrElse(""))) { + jobLogger.warn("Globbing on EFS has risks.") + jobLogger.warn(s"The globbing target (${globbedDir}/.${globName}/) will be overwritten when existing!") + jobLogger.warn("Consider keeping globbed outputs in the cromwell-root folder") + s"rm -Rf $globDirectory $globList && mkdir" + } else { + "mkdir" + } + + val controlFileContent = + """This file is used by Cromwell to allow for globs that would not match any file. + |By its presence it works around the limitation of some backends that do not allow empty globs. + |Regardless of the outcome of the glob, this file will not be part of the final list of globbed files. + """.stripMargin + + s"""|# make the directory which will keep the matching files + |$mkDirCmd $globDirectory + | + |# create the glob control file that will allow for the globbing to succeed even if there is 0 match + |echo "${controlFileContent.trim}" > $globDirectory/$controlFileName + | + |# hardlink or symlink all the files into the glob directory + |$globLinkCommand + | + |# list all the files (except the control file) that match the glob into a file called glob-[md5 of glob].list + |ls -1 $globDirectory | grep -v $controlFileName > $globList + |""".stripMargin + } } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala index cc019f51004..ecb6c87a457 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala @@ -61,7 +61,9 @@ case class AwsBatchAttributes(fileSystem: String, fsxMntPoint: Option[List[String]], efsMntPoint: Option[String], efsMakeMD5: Option[Boolean], - efsDelocalize: Option[Boolean] + efsDelocalize: Option[Boolean], + globLinkCommand: Option[String], + checkSiblingMd5: Option[Boolean] ) object AwsBatchAttributes { @@ -84,13 +86,11 @@ object AwsBatchAttributes { "numCreateDefinitionAttempts", "numSubmitAttempts", "default-runtime-attributes.scriptBucketName", - //"default-runtime-attributes.efsMountPoint", - //"default-runtime-attributes.efsMakeMD5", - //"default-runtime-attributes.efsDelocalize", "awsBatchRetryAttempts", "ulimits", "efsDelocalize", - "efsMakeMD5" + "efsMakeMD5", + "glob-link-command" ) private val deprecatedAwsBatchKeys: Map[String, String] = Map( @@ -108,7 +108,6 @@ object AwsBatchAttributes { val deprecatedKeys = keys.intersect(deprecated.keySet) deprecatedKeys foreach { key => logger.warn(s"Found deprecated configuration key $key, replaced with ${deprecated.get(key)}") } } - def parseFSx(config: List[String]): Option[List[String]] = { config.isEmpty match { case true => None @@ -116,7 +115,7 @@ object AwsBatchAttributes { } } - def parseEFS(config: String): Option[String] = { + def parseConfigString(config: String): Option[String] = { config.isEmpty match { case true => None case false => Some(config) @@ -165,7 +164,7 @@ object AwsBatchAttributes { // EFS settings: val efsMntPoint:ErrorOr[Option[String]] = validate {backendConfig.hasPath("filesystems.local.efs") match { - case true => parseEFS(backendConfig.getString("filesystems.local.efs")) + case true => parseConfigString(backendConfig.getString("filesystems.local.efs")) case false => None } } @@ -181,6 +180,20 @@ object AwsBatchAttributes { case false => None } } + // from config if set. + val globLinkCommand:ErrorOr[Option[String]] = validate { + backendConfig.hasPath("glob-link-command") match { + case true => Some(backendConfig.getString("glob-link-command")) + case false => None + } + } + // from config if set: + val checkSiblingMd5:ErrorOr[Option[Boolean]] = validate { + backendConfig.hasPath("filesystems.local.caching.check-sibling-md5") match { + case true => Some(backendConfig.getBoolean("filesystems.local.caching.check-sibling-md5")) + case false => None + } + } ( fileSysStr, @@ -192,7 +205,9 @@ object AwsBatchAttributes { fsxMntPoint, efsMntPoint, efsMakeMD5, - efsDelocalize + efsDelocalize, + globLinkCommand, + checkSiblingMd5 ).tupled.map((AwsBatchAttributes.apply _).tupled) match { case Valid(r) => r case Invalid(f) => diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendInitializationData.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendInitializationData.scala index edfbd560362..bff8848e037 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendInitializationData.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendInitializationData.scala @@ -33,6 +33,7 @@ package cromwell.backend.impl.aws import cromwell.backend.standard.{StandardInitializationData, StandardValidatedRuntimeAttributesBuilder} import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider +import cromwell.backend.impl.aws.io.AwsBatchWorkflowPaths case class AwsBatchBackendInitializationData ( diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendLifecycleActorFactory.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendLifecycleActorFactory.scala index c0ae9210852..acb066d7332 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendLifecycleActorFactory.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchBackendLifecycleActorFactory.scala @@ -36,7 +36,8 @@ import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationDa import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardFinalizationActor, StandardFinalizationActorParams, StandardInitializationActor, StandardInitializationActorParams, StandardLifecycleActorFactory} import cromwell.core.CallOutputs import wom.graph.CommandCallNode - +import cromwell.backend.impl.aws.callcaching.{AwsBatchBackendCacheHitCopyingActor, AwsBatchBackendFileHashingActor} +import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardFileHashingActor} /** * Factory to create `Actor` objects to manage the lifecycle of a backend job on AWS Batch. This factory provides an * object from the `AwsBatchAsyncBackendJobExecutionActor` class to create and manage the job. @@ -89,6 +90,12 @@ case class AwsBatchBackendLifecycleActorFactory( AwsBatchFinalizationActorParams(workflowDescriptor, ioActor, calls, configuration, jobExecutionMap, workflowOutputs, initializationDataOption) } + override lazy val cacheHitCopyingActorClassOption: Option[Class[_ <: StandardCacheHitCopyingActor]] = { + Option(classOf[AwsBatchBackendCacheHitCopyingActor]) + } + + override lazy val fileHashingActorClassOption: Option[Class[_ <: StandardFileHashingActor]] = Option(classOf[AwsBatchBackendFileHashingActor]) + override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] = { Option(AwsBatchSingletonActor.props(configuration.awsConfig.region, Option(configuration.awsAuth))) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchConfiguration.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchConfiguration.scala index 5f6d9eaf17a..8a019110509 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchConfiguration.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchConfiguration.scala @@ -60,6 +60,8 @@ class AwsBatchConfiguration(val configurationDescriptor: BackendConfigurationDes val efsMntPoint = batchAttributes.efsMntPoint val efsMakeMD5 = batchAttributes.efsMakeMD5 val efsDelocalize = batchAttributes.efsDelocalize + val globLinkCommand = batchAttributes.globLinkCommand + val checkSiblingMd5 = batchAttributes.checkSiblingMd5 } object AWSBatchStorageSystems { diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala index b75d4d6b00f..248ee90e8e3 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala @@ -37,9 +37,13 @@ import cromwell.filesystems.s3.S3PathBuilder import cromwell.filesystems.s3.S3PathBuilder.{InvalidS3Path, PossiblyValidRelativeS3Path, ValidFullS3Path} import cromwell.filesystems.s3.batch.S3BatchCommandBuilder import cromwell.core.path.{DefaultPath, Path} +import cromwell.backend.impl.aws.io._ -class AwsBatchExpressionFunctions(standardParams: StandardExpressionFunctionsParams) - extends StandardExpressionFunctions(standardParams) { + + +class AwsBatchExpressionFunctions(override val standardParams: StandardExpressionFunctionsParams) + extends StandardExpressionFunctions(standardParams) with AwsBatchGlobFunctions { + override lazy val ioCommandBuilder: IoCommandBuilder = S3BatchCommandBuilder override def preMapping(str: String) = { diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchInitializationActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchInitializationActor.scala index 16241dbb8ce..5078b5b87f8 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchInitializationActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchInitializationActor.scala @@ -47,6 +47,7 @@ import wom.graph.CommandCallNode import org.apache.commons.codec.binary.Base64 import spray.json.{JsObject, JsString} import org.slf4j.{Logger, LoggerFactory} +import cromwell.backend.impl.aws.io.AwsBatchWorkflowPaths import scala.concurrent.Future import scala.jdk.CollectionConverters._ diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 283b97ebd97..48f286dc436 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -305,7 +305,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL val outputCopyCommand = outputs.map { case output: AwsBatchFileOutput if output.local.pathAsString.contains("*") => "" //filter out globs case output: AwsBatchFileOutput if output.name.endsWith(".list") && output.name.contains("glob-") => - + Log.debug("Globbing : check for EFS settings.") val s3GlobOutDirectory = output.s3key.replace(".list", "") val globDirectory = output.name.replace(".list", "") /* @@ -313,16 +313,60 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL * if it doesn't exist then 'touch' it so that it can be copied otherwise later steps will get upset * about the missing file */ - s""" - |touch ${output.name} - |_s3_delocalize_with_retry ${output.name} ${output.s3key} - |if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi""".stripMargin - + if ( efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get ) { + Log.debug("EFS glob output file detected: "+ output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}") + val test_cmd = if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) { + Log.debug("delocalization on EFS is enabled") + s""" + |touch ${output.name} + |_s3_delocalize_with_retry ${output.name} ${output.s3key} + |if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi + |""".stripMargin + } else { + + // check file for existence + s""" + |# test the glob list + |test -e ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} || (echo 'output file: ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} does not exist' && exit 1) + |# test individual files. + |for F in $$(cat ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}); do + | test -e "${globDirectory}/$$F" || (echo 'globbed file: "${globDirectory}/$$F" does not exist' && exit 1) + |done + |""" + } + // need to make md5sum? + val md5_cmd = if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) { + Log.debug("Add cmd to create MD5 sibling.") + // this does NOT regenerate the md5 in case the file is overwritten ! + s""" + |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]] ; then + | # the glob list + | md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && exit 1 ); + | # globbed files, using specified number of cpus for parallel processing. + | cat ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} | xargs -I% -P${runtimeAttributes.cpu.##.toString} bash -c "md5sum ${globDirectory}/% > ${globDirectory}/%.md5" + |fi + |""".stripMargin + } else { + Log.debug("MD5 not enabled: "+efsMakeMD5.get.toString()) + "" + } + // return combined result + s""" + |${test_cmd} + |${md5_cmd} + | """.stripMargin + } else { + // default delocalization command. + s""" + |touch ${output.name} + |_s3_delocalize_with_retry ${output.name} ${output.s3key} + |if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi""".stripMargin + } // files on /cromwell/ working dir must be delocalized case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString => //output is on working disk mount - Log.info("output Data on working disk mount" + output.local.pathAsString) + Log.debug("output Data on working disk mount" + output.local.pathAsString) s"""_s3_delocalize_with_retry $workDir/${output.local.pathAsString} ${output.s3key}""".stripMargin // file(name (full path), s3key (delocalized path), local (file basename), mount (disk details)) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobCachingActorHelper.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobCachingActorHelper.scala index a3c693855be..35ca4d0c17c 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobCachingActorHelper.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobCachingActorHelper.scala @@ -32,7 +32,7 @@ package cromwell.backend.impl.aws import akka.actor.Actor -import cromwell.backend.impl.aws.io.{AwsBatchVolume, AwsBatchWorkingDisk} +import cromwell.backend.impl.aws.io.{AwsBatchVolume, AwsBatchWorkingDisk,AwsBatchJobPaths} import cromwell.backend.standard.StandardCachingActorHelper import cromwell.core.logging.JobLogging import cromwell.core.path.Path diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobPaths.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobPaths.scala index e8d93570dbc..219b264cc93 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobPaths.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobPaths.scala @@ -29,10 +29,13 @@ * POSSIBILITY OF SUCH DAMAGE. */ -package cromwell.backend.impl.aws +package cromwell.backend.impl.aws.io import cromwell.backend.BackendJobDescriptorKey import cromwell.backend.io.JobPaths +import cromwell.core.path.Path +import cromwell.core.JobKey + object AwsBatchJobPaths { val AwsBatchLogPathKey = "cromwellLog" @@ -60,4 +63,19 @@ final case class AwsBatchJobPaths(override val workflowPaths: AwsBatchWorkflowPa override def defaultStderrFilename: String = s"$logBasename-stderr.log" override def forCallCacheCopyAttempts: JobPaths = this.copy(isCallCacheCopyAttempt = true) + override lazy val callRoot = callPathBuilder(workflowPaths.workflowRoot, jobKey, isCallCacheCopyAttempt) + + def callPathBuilder(root: Path, jobKey: JobKey, isCallCacheCopyAttempt: Boolean) = { + val callName = jobKey.node.localName + val call = s"${JobPaths.CallPrefix}-${callName}" + val shard = jobKey.index map { s => s"${JobPaths.ShardPrefix}-$s" } getOrElse "" + + val retryOrCallCache = + if (isCallCacheCopyAttempt) JobPaths.CacheCopyPrefix + else if (jobKey.attempt > 1) s"${JobPaths.AttemptPrefix}-${jobKey.attempt}" + else "" + + List(call, shard, retryOrCallCache).foldLeft(root)((path, dir) => path.resolve(dir)) + + } } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchWorkflowPaths.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchWorkflowPaths.scala index 45d3eafe75d..fb2f3675951 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchWorkflowPaths.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchWorkflowPaths.scala @@ -29,7 +29,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -package cromwell.backend.impl.aws +package cromwell.backend.impl.aws.io import akka.actor.ActorSystem import com.typesafe.config.Config @@ -39,6 +39,7 @@ import cromwell.core.WorkflowOptions import cromwell.core.path.PathBuilder import cromwell.filesystems.s3.S3PathBuilderFactory import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider +import cromwell.backend.impl.aws.{AwsBatchConfiguration,AWSBatchStorageSystems} object AwsBatchWorkflowPaths { private val RootOptionKey = "aws_s3_root" @@ -70,8 +71,15 @@ case class AwsBatchWorkflowPaths(workflowDescriptor: BackendWorkflowDescriptor, override def config: Config = configuration.configurationDescriptor.backendConfig override def pathBuilders: List[PathBuilder] = { if (configuration.fileSystem == "s3") { - List(configuration.pathBuilderFactory.asInstanceOf[S3PathBuilderFactory].fromProvider(workflowOptions, provider)) + // if efs is activated : add the default (local) pathbuilders. + if (configuration.batchAttributes.efsMntPoint.isDefined) { + List(configuration.pathBuilderFactory.asInstanceOf[S3PathBuilderFactory].fromProvider(workflowOptions, provider) ) ++ WorkflowPaths.DefaultPathBuilders + } + else { + List(configuration.pathBuilderFactory.asInstanceOf[S3PathBuilderFactory].fromProvider(workflowOptions, provider) ) + } } else { - WorkflowPaths.DefaultPathBuilders} + WorkflowPaths.DefaultPathBuilders + } } } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md index 8576dcb6de9..a1cdce0ba1b 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md @@ -258,7 +258,7 @@ Cromwell EC2 instances can be equipped with a shared elastic filesystem, termed 1. Setup an EFS filesystem: -Following the [GenomicsWorkFlows](https://docs.opendata.aws/genomics-workflows/orchestration/cromwell/cromwell-overview.html) deployment stack and selecting "Create EFS", you will end up with an EFS filesystem accessible within the provided subnets. It is mounted by default in EC2 workers under /mnt/efs. This is specified in the launch templates USER_DATA if you want to change this. +Following the [GenomicsWorkFlows](https://docs.opendata.aws/genomics-workflows/orchestration/cromwell/cromwell-overview.html) deployment stack and selecting "Create EFS", you will end up with an EFS filesystem accessible within the provided subnets. It is mounted by default in EC2 workers under /mnt/efs. This is specified in the launch templates USER_DATA if you want to change this. *BEWARE* : For globbing to work on EFS, the mountpoint must be left as "/mnt/efs" ! Next, it is recommended to change the EFS setup (in console : select EFS service, then the "SharedDataGenomics" volume, edit). Change performance settings to Enhanced/Elastic, because the bursting throughput is usually insufficient. Optionally set the lifecycle to archive infrequently accessed data and reduce costs. @@ -271,13 +271,18 @@ backend { providers { AWSBatch { config{ + // ALTER THE GLOBBING COMMAND: + // use symlinks instad of hardlinks (not allowed on EFS) + // "shopt -s nullglob" prevents issues with empty globbed folders. + glob-link-command = " shopt -s nullglob; ln -fs GLOB_PATTERN GLOB_DIRECTORY " + default-runtime-attributes { // keep you other settings as they are (queueArn etc) // DEFAULT EFS CONFIG // delocalize output files under /mnt/efs to the cromwell tmp bucket efsDelocalize = false - // make md5 sums of output files under /mnt/efs as part of the job + // make sibling-md5 files of output files under /mnt/efs as part of the job. see local.caching! efsMakeMD5 = false } filesystems { @@ -288,6 +293,11 @@ backend { local { // the mountpoint of the EFS volume within the HOST (specified in EC2 launch template) efs = "/mnt/efs" + caching { + // sibling-md5 files reduce traffic on the EFS share. By default, files are streamed & hashed in cromwell itself + // NOTE: beware on overwriting files. the md5-siblings are not automatically cleaned up or updated !! + check-sibling-md5 : true + } } } @@ -298,22 +308,31 @@ backend { ``` -Now, Cromwell is able to correctly handle output files both located in the cromwell working directory (delocalized to S3), and on the EFS volume (kept, and optionally delocalized). +Now, Cromwell is able to : + +- take input from localized files in temp-bucket (default) +- take input from files stored on EFS (no localization performed) +- generate output files located in the cromwell working directory : delocalized to S3 tmp bucket +- generate output files located on the EFS volume : checked for presence, kept, and optionally delocalized +- generate sibling md5 files for default output files and globbed output files +- detect valid cached job outputs from previous runs, both on S3/tmp-bucket and EFS, for single files and globbed files. + +Note: generating sibling md5 files for globbed output utilized the amount of vCPUs specified for the job. 3. Current limitations: -- To read an task output file in wdl using eg read_tsv(), it must be set to output type 'String' OR 'efsDelocalize' must be enabled for the job. -- Call caching is not yet possible when using input files located on EFS. Cromwell does not crash but issues errors and skips callcaching for that task. - There is no unique temp/scratch folder generated per workflow ID. Data collision prevention is left to the user. - Cleanup must be done manually +- Globbing only works if efs is mounted under "/mnt/efs" (see config above) 4. Example Workflow The following workflow highlights the following features: - take input data from an s3 bucket. - - keep intermediate data on efs - - delocalize output from efs volume + - generate & keep intermediate data on efs + - glob files on s3 + - delocalize output from efs volume to s3 - read a file on efs in the main wdl cromwell process. @@ -324,28 +343,40 @@ workflow TestEFS { # input file for WF is located on S3 File s3_file = 's3://aws-quickstart/quickstart-aws-vpc/templates/aws-vpc.template.yaml' # set an input parameter holding the working dir on EFS - String efs_wd = "/mnt/efs/MyProject" + String efs_wd = "/mnt/efs/MyTestProject" } - # task one : create a file on efs. + # task one : create a file and a glob on efs. do not delocalize call task_one {input: infile = s3_file, wd = efs_wd } - # read the outfile straight in a wdl structure - Array[Array[String]] myOutString_info = read_tsv(task_one.outfile) + # read the outfile on EFS straight in a wdl structure + Array[Array[String]] step1_info = read_tsv(task_one.efs_file) - # task two : reuse the file on the wd and delocalize to s3 + # task two : reuse the file on the wd and delocalize to s3 (via runtime setting) call task_two {input: wd = efs_wd, - infile = task_one.outfile + infile = task_one.efs_file + } + Array[Array[String]] step2_info = read_tsv(task_two.outfile) + + # run a task on the various files (get md5). + call task_three {input: + wd = efs_wd, + infiles = task_one.file_list, + f = task_two.outfile } - Array[Array[String]] myOutFile_info = read_tsv(task_two.outfile) + Array[String] step3_md5s = task_three.md5 ## outputs output{ - Array[Array[String]] wf_out_info_returned_as_string = myOutString_info - Array[Array[String]] wf_out_info_returned_as_file = myOutFile_info - String wf_out_file = task_two.outfile + Array[Array[String]] wf_out_info_step1 = step1_info + File wf_out_efs_file = task_one.efs_file + File wf_out_s3_file = task_one.s3_file + Array[File] wf_out_globList_out = task_one.file_list + + File wf_out_file_step2 = task_two.outfile + Array[String] wf_out_step3 = step3_md5s } } @@ -356,21 +387,28 @@ task task_one { } command { # mk the wd: - mkdir -p ~{wd} + mkdir -p ~{wd}/StuffToGlob + # create files + for i in A B C D E F G H I F K L M N O P Q R; do + echo $i > ~{wd}/StuffToGlob/$i.txt + done # mv the infile to wd mv ~{infile} ~{wd}/ - # generate an outfile for output Testing + # generate an outfile for output Testing on EFS ls -alh ~{wd} > ~{wd}/MyOutFile + # and on the temp bucket. + ls -alh ~{wd} > MyRegularS3File.txt } runtime { docker: "ubuntu:22.04" cpu : "1" - memory: "500M" + memory: "500M" } output { - # to read a file in cromwell/wdl : pass it back as a string or delocalize (see task_two) - String outfile = '~{wd}/MyOutFile' - } + File efs_file = '~{wd}/MyOutFile' + File s3_file = "MyRegularS3File.txt" + Array[File] file_list = glob("~{wd}/StuffToGlob/*.txt") + } } task task_two { @@ -379,18 +417,38 @@ task task_two { File infile } command { - # put something new in the file: - ls -alh /tmp > ~{infile} - + # another derived file: + ls -alh /tmp > ~{infile}.step2 } runtime { docker: "ubuntu:22.04" cpu : "1" memory: "500M" efsDelocalize: true + + } + output { + File outfile = "~{infile}.step2" + } +} + +task task_three { + input { + String wd + Array[File] infiles + File f + } + command { + #get checksums for globbed + extra infile + md5sum ~{sep=' ' infiles} ~{f} + } + runtime { + docker: "ubuntu:22.04" + cpu : "1" + memory: "500M" } output { - File outfile = "~{infile}" + Array[String] md5 = read_lines(stdout()) } } ``` diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala index 86bfd46e754..1495e480358 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala @@ -30,10 +30,9 @@ */ package cromwell.backend.impl.aws.callcaching -import com.google.cloud.storage.contrib.nio.CloudStorageOptions import common.util.TryUtil import cromwell.backend.BackendInitializationData -import cromwell.backend.impl.aws.{AWSBatchStorageSystems, AwsBatchBackendInitializationData} +import cromwell.backend.impl.aws.{AWSBatchStorageSystems, AwsBatchBackendInitializationData,AwsBatchJobCachingActorHelper} import cromwell.backend.io.JobPaths import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardCacheHitCopyingActorParams} import cromwell.core.CallOutputs @@ -46,7 +45,7 @@ import wom.values.WomFile import scala.language.postfixOps import scala.util.Try -class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams) { +class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams) with AwsBatchJobCachingActorHelper{ private val batchAttributes = BackendInitializationData .as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption) .configuration.batchAttributes @@ -74,8 +73,9 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin } } + // detritus files : job script, stdout, stderr and RC files. override def processDetritus(sourceJobDetritusFiles: Map[String, String] - ): Try[(Map[String, Path], Set[IoCommand[_]])] = + ): Try[(Map[String, Path], Set[IoCommand[_]])] = { (batchAttributes.fileSystem, cachingStrategy) match { case (AWSBatchStorageSystems.s3, UseOriginalCachedOutputs) => // apply getPath on each detritus string file @@ -93,7 +93,7 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin } case (_, _) => super.processDetritus(sourceJobDetritusFiles) } - + } override protected def additionalIoCommands(sourceCallRootPath: Path, originalSimpletons: Seq[WomValueSimpleton], newOutputs: CallOutputs, @@ -113,7 +113,7 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin S3BatchCommandBuilder.writeCommand( path = jobPaths.forCallCacheCopyAttempts.callExecutionRoot / "call_caching_placeholder.txt", content = content, - options = Seq(CloudStorageOptions.withMimeType("text/plain")), + options = Seq(), ).get )) case (AWSBatchStorageSystems.s3, CopyCachedOutputs) => List.empty diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala index f731fb1b44a..41cb1dae97f 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala @@ -36,6 +36,9 @@ import cromwell.backend.BackendInitializationData import cromwell.backend.impl.aws.AwsBatchBackendInitializationData import cromwell.backend.impl.aws.AWSBatchStorageSystems import cromwell.core.io.DefaultIoCommandBuilder +import scala.util.Try +import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest +import cromwell.core.path.DefaultPathBuilder class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorParams) extends StandardFileHashingActor(standardParams) { @@ -44,4 +47,27 @@ class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorPa case AWSBatchStorageSystems.s3 => S3BatchCommandBuilder case _ => DefaultIoCommandBuilder } + // get backend config. + val aws_config = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption).configuration + + // custom strategy to handle efs (local) files, in case sibling-md5 file is present. + override def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = { + val file = DefaultPathBuilder.get(fileRequest.file.valueString) + if (aws_config.efsMntPoint.isDefined && file.toString.startsWith(aws_config.efsMntPoint.getOrElse("--")) && aws_config.checkSiblingMd5.getOrElse(false)) { + // check existence of the sibling file + val md5 = file.sibling(s"${file.toString}.md5") + if (md5.exists) { + // read the file. + val md5_value: Option[String] = Some(md5.contentAsString.split("\\s+")(0)) + md5_value.map(str => Try(str)) + } else { + // No sibling found, fall back to default. + None + } + + } else { + // Detected non-EFS file: return None + None + } + } } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala new file mode 100644 index 00000000000..18505f6d866 --- /dev/null +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, + * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package cromwell.backend.impl.aws.io + + +//import cats.implicits._ +//import common.validation.ErrorOr.ErrorOr +//import cromwell.backend.BackendJobDescriptor +//import cromwell.core.CallContext +//import cromwell.core.io.AsyncIoFunctions +//import wom.expression.{IoFunctionSet, IoFunctionSetAdapter} +//import wom.graph.CommandCallNode +import wom.values._ +import cromwell.backend.io._ +import cromwell.backend.standard._ +import scala.concurrent.Future +import java.nio.file.Paths +import cromwell.core.path.DefaultPathBuilder + +//import cromwell.backend.impl.aws.AwsBatchAttributes +//import cromwell.backend.impl.aws._ + +trait AwsBatchGlobFunctions extends GlobFunctions { + + def standardParams: StandardExpressionFunctionsParams + + + + /** + * Returns a list of path from the glob. + * + * The paths are read from a list file based on the pattern. + * + * @param pattern The pattern of the glob. This is the same "glob" passed to globPath(). + * @return The paths that match the pattern. + */ + override def glob(pattern: String): Future[Seq[String]] = { + // get access to globName() + import GlobFunctions._ + + // GOAL : + // - get config (backend / runtime / ...) here to evaluate if efsMntPoint is set & if efs delocalization is set. + // - according to those values : write the pattern as s3:// or as local path. + // - get the wf id from the config settings. + + // for now : hard coded as local at mount point /mnt/efs. + val wfid_regex = ".{8}-.{4}-.{4}-.{4}-.{12}".r + val wfid = callContext.root.toString.split("/").toList.filter(element => wfid_regex.pattern.matcher(element).matches()).lastOption.getOrElse("") + val globPatternName = globName(s"${pattern}-${wfid}") + val globbedDir = Paths.get(pattern).getParent.toString + val listFilePath = if (pattern.startsWith("/mnt/efs/")) { + DefaultPathBuilder.get(globbedDir + "/." + globPatternName + ".list") + } else { + callContext.root.resolve(s"${globPatternName}.list") + } + asyncIo.readLinesAsync(listFilePath.toRealPath()) map { lines => + lines.toList map { fileName => + // again : this should be config based... + if (pattern.startsWith("/mnt/efs/")) { + s"${globbedDir}/.${globPatternName}/${fileName}" + } else { + s"${callContext.root}/${globPatternName}/${fileName}" + } + } + } + } + +} \ No newline at end of file