diff --git a/backend/src/main/scala/cromwell/backend/backend.scala b/backend/src/main/scala/cromwell/backend/backend.scala index 3c5c33ab992..48212ebd798 100644 --- a/backend/src/main/scala/cromwell/backend/backend.scala +++ b/backend/src/main/scala/cromwell/backend/backend.scala @@ -143,6 +143,7 @@ object CommonBackendConfigurationAttributes { "default-runtime-attributes.awsBatchRetryAttempts", "default-runtime-attributes.maxRetries", "default-runtime-attributes.awsBatchEvaluateOnExit", + "default-runtime-attributes.sharedMemorySize", "default-runtime-attributes.ulimits", "default-runtime-attributes.efsDelocalize", "default-runtime-attributes.efsMakeMD5", diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index 8acef722f71..876473f7bf7 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -38,7 +38,6 @@ import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpres import wom.expression.WomExpression import wom.graph.LocalName import wom.values._ -import wom.types._ import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper} import java.net.URLDecoder import scala.concurrent._ @@ -171,7 +170,7 @@ trait StandardAsyncExecutionActor // decode url encoded values generated by the mapper val decoded_womValue = URLDecoder.decode(mapped_wom.valueString,"UTF-8") // convert to womfile again - WomFile(WomSingleFileType, decoded_womValue) + WomFile(womFile.womFileType, decoded_womValue) } ) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 150d61bef79..72b4c922642 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -47,7 +47,7 @@ import common.util.StringUtil._ import common.validation.Validation._ import cromwell.backend._ -import cromwell.backend.async._ +import cromwell.backend.async._ import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus} import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus} @@ -84,9 +84,9 @@ import scala.util.control.NoStackTrace import scala.util.{Success, Try, Failure} /** - * The `AwsBatchAsyncBackendJobExecutionActor` creates and manages a job. The job itself is encapsulated by the - * functionality in `AwsBatchJob` - */ + * The `AwsBatchAsyncBackendJobExecutionActor` creates and manages a job. The job itself is encapsulated by the + * functionality in `AwsBatchJob` + */ object AwsBatchAsyncBackendJobExecutionActor { val AwsBatchOperationIdKey = "__aws_batch_operation_id" @@ -98,8 +98,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar with KvClient with AskSupport { /** - * The builder for `IoCommands` to the storage system used by jobs executed by this backend - */ + * The builder for `IoCommands` to the storage system used by jobs executed by this backend + */ override lazy val ioCommandBuilder: IoCommandBuilder = configuration.fileSystem match { case AWSBatchStorageSystems.s3 => S3BatchCommandBuilder case _ => DefaultIoCommandBuilder @@ -119,11 +119,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override type StandardAsyncRunState = RunStatus /** - * Determines if two run statuses are equal - * @param thiz a `RunStatus` - * @param that a `RunStatus` - * @return true if they are equal, else false - */ + * Determines if two run statuses are equal + * @param thiz a `RunStatus` + * @param that a `RunStatus` + * @return true if they are equal, else false + */ def statusEquivalentTo(thiz: StandardAsyncRunState)(that: StandardAsyncRunState): Boolean = thiz == that override lazy val pollBackOff: SimpleExponentialBackoff = SimpleExponentialBackoff(1.second, 5.minutes, 1.1) @@ -178,13 +178,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar * commandScriptContents here */ - /* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing. - we'll see how much we need... - */ + /* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing. + we'll see how much we need... + */ lazy val cmdScript = configuration.fileSystem match { - case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.toOption.get - case _ => execScript + case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.toOption.get + case _ => execScript } lazy val batchJob: AwsBatchJob = { @@ -229,13 +229,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def requestsAbortAndDiesImmediately: Boolean = false /** - * Takes two arrays of remote and local WOM File paths and generates the necessary AwsBatchInputs. - */ + * Takes two arrays of remote and local WOM File paths and generates the necessary AwsBatchInputs. + */ private def inputsFromWomFiles(namePrefix: String, - remotePathArray: Seq[WomFile], - localPathArray: Seq[WomFile], - jobDescriptor: BackendJobDescriptor, - flag: Boolean): Iterable[AwsBatchInput] = { + remotePathArray: Seq[WomFile], + localPathArray: Seq[WomFile], + jobDescriptor: BackendJobDescriptor, + flag: Boolean): Iterable[AwsBatchInput] = { (remotePathArray zip localPathArray zipWithIndex) flatMap { case ((remotePath, localPath), index) => @@ -247,38 +247,38 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk)) } - + } /** - * Turns WomFiles into relative paths. These paths are relative to the working disk. - * - * relativeLocalizationPath("foo/bar.txt") -> "foo/bar.txt" - * relativeLocalizationPath("s3://some/bucket/foo.txt") -> "some/bucket/foo.txt" - */ + * Turns WomFiles into relative paths. These paths are relative to the working disk. + * + * relativeLocalizationPath("foo/bar.txt") -> "foo/bar.txt" + * relativeLocalizationPath("s3://some/bucket/foo.txt") -> "some/bucket/foo.txt" + */ override protected def relativeLocalizationPath(file: WomFile): WomFile = { file.mapFile(value => getPath(value) match { // for s3 paths : case Success(path: S3Path) => configuration.fileSystem match { - case AWSBatchStorageSystems.s3 => - URLDecoder.decode(path.pathWithoutScheme,"UTF-8") - case _ => - URLDecoder.decode(path.toString,"UTF-8") + case AWSBatchStorageSystems.s3 => + URLDecoder.decode(path.pathWithoutScheme,"UTF-8") + case _ => + URLDecoder.decode(path.toString,"UTF-8") } // non-s3 paths - case _ => - URLDecoder.decode(value,"UTF-8") + case _ => + URLDecoder.decode(value,"UTF-8") } ) } /** - * Generate a set of inputs based on a job description - * @param jobDescriptor the job descriptor from Cromwell - * @return the inputs derived from the descriptor - */ + * Generate a set of inputs based on a job description + * @param jobDescriptor the job descriptor from Cromwell + * @return the inputs derived from the descriptor + */ private[aws] def generateAwsBatchInputs(jobDescriptor: BackendJobDescriptor): Set[AwsBatchInput] = { val writeFunctionFiles = instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f) } toMap @@ -301,7 +301,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case womFile: WomFile => womFile } } - + val callInputInputs = callInputFiles flatMap { case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true) } @@ -317,11 +317,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } /** - * Given a path (relative or absolute), returns a (Path, AwsBatchVolume) tuple where the Path is - * relative to the Volume's mount point - * - * @throws Exception if the `path` does not live in one of the supplied `disks` - */ + * Given a path (relative or absolute), returns a (Path, AwsBatchVolume) tuple where the Path is + * relative to the Volume's mount point + * + * @throws Exception if the `path` does not live in one of the supplied `disks` + */ private def relativePathAndVolume(path: String, disks: Seq[AwsBatchVolume] ): (Path, AwsBatchVolume) = { def getAbsolutePath(path: Path) = { @@ -330,7 +330,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case _ => AwsBatchWorkingDisk.MountPoint.resolve(path) } } - + val absolutePath = DefaultPathBuilder.get(path) match { case p if !p.isAbsolute => getAbsolutePath(p) case p => p @@ -343,10 +343,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } /** - * Produces names with a length less than 128 characters possibly by producing a digest of the name - * @param referenceName the name to make safe - * @return the name or the MD5sum of that name if the name is >= 128 characters - */ + * Produces names with a length less than 128 characters possibly by producing a digest of the name + * @param referenceName the name to make safe + * @return the name or the MD5sum of that name if the name is >= 128 characters + */ private def makeSafeAwsBatchReferenceName(referenceName: String) = { if (referenceName.length <= 127) referenceName else referenceName.md5Sum } @@ -357,7 +357,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Try( output.expression.evaluateFiles(jobDescriptor.localInputs, NoIoFunctionSet, output.womType).map(_.toList map { _.file }) ).getOrElse(List.empty[WomFile].validNel) - .getOrElse(List.empty) + .getOrElse(List.empty) } val womFileOutputs = jobDescriptor.taskCall.callable.outputs.flatMap(evaluateFiles) map relativeLocalizationPath val outputs: Seq[AwsBatchFileOutput] = womFileOutputs.distinct flatMap { @@ -369,7 +369,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val additionalGlobOutput = jobDescriptor.taskCall.callable.additionalGlob.toList.flatMap(generateAwsBatchGlobFileOutputs).toSet - + outputs.toSet ++ additionalGlobOutput } @@ -404,7 +404,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateAwsBatchSingleFileOutputs(womFile: WomSingleFile): List[AwsBatchFileOutput] = { // rewrite this to create more flexibility - // + // val destination = configuration.fileSystem match { case AWSBatchStorageSystems.s3 => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString case _ => DefaultPathBuilder.get(womFile.valueString) match { @@ -414,19 +414,21 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val (relpath, disk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) - - val output = if (configuration.efsMntPoint.isDefined && - configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) && - ! runtimeAttributes.efsDelocalize) { - // name: String, s3key: String, local: Path, mount: AwsBatchVolume - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), womFile.value, relpath, disk) - } else { - // if efs is not enabled, OR efs delocalization IS enabled, keep the s3 path as destination. - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), URLDecoder.decode(destination,"UTF-8"), relpath, disk) - } + + val output = if (configuration.efsMntPoint.isDefined && + configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) && + ! runtimeAttributes.efsDelocalize) { + // name: String, s3key: String, local: Path, mount: AwsBatchVolume + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), womFile.value, relpath, disk) + } else { + // if efs is not enabled, OR efs delocalization IS enabled, keep the s3 path as destination. + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), URLDecoder.decode(destination,"UTF-8"), relpath, disk) + } List(output) } - + + + // get a unique glob name locations & paths. // 1. globName :md5 hash of local PATH and WF_ID // 2. globbedDir : local path of the directory being globbed. @@ -437,10 +439,17 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // add workflow id to hash for better conflict prevention val wfid = standardParams.jobDescriptor.toString.split(":")(0) val globName = GlobFunctions.globName(s"${womFile.value}-${wfid}") - val globbedDir = Paths.get(womFile.value).getParent.toString + + val globbedDir = Paths.get(womFile.value).getParent match { + // remove ./ so it does not appear on s3 path + case path: Path => path.toString.stripPrefix(".") + case _ => "" + } + // generalize folder and list file - val globDirectory = DefaultPathBuilder.get(globbedDir + "/." + globName + "/") - val globListFile = DefaultPathBuilder.get(globbedDir + "/." + globName + ".list") + val globDirPrefix = s"${globbedDir}/.${globName}".stripPrefix("/"); + val globDirectory = DefaultPathBuilder.get(globDirPrefix + "/") + val globListFile = DefaultPathBuilder.get(globDirPrefix + ".list") // locate the disk where the globbed data resides val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks) @@ -456,26 +465,26 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } // return results return ( - globName, - globbedDir, - globDirectoryDisk, - globDirectoryDestinationPath.toString, - globListFileDestinationPath.toString + globName, + globbedDir, + globDirectoryDisk, + globDirectoryDestinationPath.toString, + globListFileDestinationPath.toString ) - + } // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateAwsBatchGlobFileOutputs(womFile: WomGlobFile): List[AwsBatchFileOutput] = { - + val (globName, globbedDir, globDirectoryDisk, globDirectoryDestinationPath, globListFileDestinationPath) = generateGlobPaths(womFile) - val (relpathDir,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + "/" + "*").toString,runtimeAttributes.disks) - val (relpathList,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + ".list").toString,runtimeAttributes.disks) + val (relpathDir,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + "/" + "*").toString.stripPrefix("/"),runtimeAttributes.disks) + val (relpathList,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + ".list").toString.stripPrefix("/"),runtimeAttributes.disks) // We need both the glob directory and the glob list: List( // The glob directory:. - AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString,globDirectoryDestinationPath, relpathDir, globDirectoryDisk), + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString.stripPrefix("/"),globDirectoryDestinationPath, relpathDir, globDirectoryDisk), // The glob list file: - AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString, globListFileDestinationPath, relpathList, globDirectoryDisk) + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString.stripPrefix("/"), globListFileDestinationPath, relpathList, globDirectoryDisk) ) } @@ -514,9 +523,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } /** - * Asynchronously upload the command script to the script path - * @return a `Future` for the asynch operation - */ + * Asynchronously upload the command script to the script path + * @return a `Future` for the asynch operation + */ def uploadScriptFile(): Future[Unit] = { commandScriptContents.fold( errors => Future.failed(new RuntimeException(errors.toList.mkString(", "))), @@ -589,8 +598,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } override def handleExecutionResult(status: StandardAsyncRunState, - oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = { - + oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = { + // get path to sderr val stderr = jobPaths.standardPaths.error @@ -614,16 +623,16 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 => val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None)) retryElseFail(executionHandle) - // job was aborted (cancelled by user?) - // on AWS OOM kill are code 137 : check retryWithMoreMemory here + // job was aborted (cancelled by user?) + // on AWS OOM kill are code 137 : check retryWithMoreMemory here case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory => jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString.stripLineEnd}'") Future.successful(AbortedExecutionHandle) // if instance killed after RC.txt creation : edge case with status == Failed AND returnCode == [accepted values] => retry. case Success(returnCodeAsInt) if status.toString() == "Failed" && continueOnReturnCode.continueFor(returnCodeAsInt) => - jobLogger.debug(s"Suspected spot kill due to status/RC mismatch") - val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(UnExpectedStatus(jobDescriptor.key.tag, returnCodeAsInt, status.toString(), stderrAsOption), Option(returnCodeAsInt), None)) - retryElseFail(executionHandle) + jobLogger.debug(s"Suspected spot kill due to status/RC mismatch") + val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(UnExpectedStatus(jobDescriptor.key.tag, returnCodeAsInt, status.toString(), stderrAsOption), Option(returnCodeAsInt), None)) + retryElseFail(executionHandle) // job considered ok by accepted exit code case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) => handleExecutionSuccess(status, oldHandle, returnCodeAsInt) @@ -660,96 +669,96 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar retryElseFail(failureStatus) } } - - + + } - - - // get the exit code of the job. - def JobExitCode: Future[String] = { - - // read if the file exists - def readRCFile(fileExists: Boolean): Future[String] = { - if (fileExists) - asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false) - else { - jobLogger.warn("RC file not found in aws version. Setting job to failed.") - //Thread.sleep(300000) - Future("1") - } - } - //finally : assign the yielded variable - for { - fileExists <- asyncIo.existsAsync(jobPaths.returnCode) - jobRC <- readRCFile(fileExists) - } yield jobRC - } - - // new OOM detection - def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future { - // STATUS LOGIC: - // - success : container exit code is zero - // - command failure: container exit code > 0, no statusReason in container - // - OOM kill : container exit code > 0, statusReason contains "OutOfMemory" OR exit code == 137 - // - spot kill : no container exit code set. statusReason of ATTEMPT (not container) says "host EC2 (...) terminated" - Log.debug(s"Looking for memoryRetry in job '${job.jobId}'") - val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build) - val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'")) - val nrAttempts = jobDetail.attempts.size - // if job is terminated/cancelled before starting, there are no attempts. - val lastattempt = - try { - jobDetail.attempts.get(nrAttempts-1) - } catch { - case _ : Throwable => null - } - if (lastattempt == null ) { - Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.") - false - } - var containerRC = - try { - lastattempt.container.exitCode - } catch { - case _ : Throwable => null - } - // if missing, set to failed. - if (containerRC == null ) { - Log.debug(s"No RC found for job '${job.jobId}', most likely a spot kill") - containerRC = 1 + + + // get the exit code of the job. + def JobExitCode: Future[String] = { + + // read if the file exists + def readRCFile(fileExists: Boolean): Future[String] = { + if (fileExists) + asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false) + else { + jobLogger.warn("RC file not found in aws version. Setting job to failed.") + //Thread.sleep(300000) + Future("1") } - // if not zero => get reason, else set retry to false. - containerRC.toString() match { - case "0" => - Log.debug("container exit code was zero. job succeeded") - false - case "137" => - Log.info("Job failed with Container status reason : 'OutOfMemory' (code:137)") - true - case _ => - // failed job due to command errors (~ user errors) don't have a container exit reason. - val containerStatusReason:String = { - var lastReason = lastattempt.container.reason - // cast null to empty-string to prevent nullpointer exception. - if (lastReason == null || lastReason.isEmpty) { - lastReason = "" - log.debug("No exit reason found for container.") - } else { - Log.warn(s"Job failed with Container status reason : '${lastReason}'") - } - lastReason - } - // check the list of OOM-keys against the exit reason. - val RetryMemoryKeys = memoryRetryErrorKeys.toList.flatten - val retry = RetryMemoryKeys.exists(containerStatusReason.contains) - Log.debug(s"Retry job based on provided keys : '${retry}'") - retry + } + //finally : assign the yielded variable + for { + fileExists <- asyncIo.existsAsync(jobPaths.returnCode) + jobRC <- readRCFile(fileExists) + } yield jobRC + } + + // new OOM detection + def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future { + // STATUS LOGIC: + // - success : container exit code is zero + // - command failure: container exit code > 0, no statusReason in container + // - OOM kill : container exit code > 0, statusReason contains "OutOfMemory" OR exit code == 137 + // - spot kill : no container exit code set. statusReason of ATTEMPT (not container) says "host EC2 (...) terminated" + Log.debug(s"Looking for memoryRetry in job '${job.jobId}'") + val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build) + val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'")) + val nrAttempts = jobDetail.attempts.size + // if job is terminated/cancelled before starting, there are no attempts. + val lastattempt = + try { + jobDetail.attempts.get(nrAttempts-1) + } catch { + case _ : Throwable => null + } + if (lastattempt == null ) { + Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.") + false + } + var containerRC = + try { + lastattempt.container.exitCode + } catch { + case _ : Throwable => null } - - - } + // if missing, set to failed. + if (containerRC == null ) { + Log.debug(s"No RC found for job '${job.jobId}', most likely a spot kill") + containerRC = 1 + } + // if not zero => get reason, else set retry to false. + containerRC.toString() match { + case "0" => + Log.debug("container exit code was zero. job succeeded") + false + case "137" => + Log.info("Job failed with Container status reason : 'OutOfMemory' (code:137)") + true + case _ => + // failed job due to command errors (~ user errors) don't have a container exit reason. + val containerStatusReason:String = { + var lastReason = lastattempt.container.reason + // cast null to empty-string to prevent nullpointer exception. + if (lastReason == null || lastReason.isEmpty) { + lastReason = "" + log.debug("No exit reason found for container.") + } else { + Log.warn(s"Job failed with Container status reason : '${lastReason}'") + } + lastReason + } + // check the list of OOM-keys against the exit reason. + val RetryMemoryKeys = memoryRetryErrorKeys.toList.flatten + val retry = RetryMemoryKeys.exists(containerStatusReason.contains) + Log.debug(s"Retry job based on provided keys : '${retry}'") + retry + } + + + } + - // Despite being a "runtime" exception, BatchExceptions for 429 (too many requests) are *not* fatal: @@ -803,7 +812,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar runStatus match { case successStatus: RunStatus.Succeeded => successStatus.eventList case unknown => { - throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown") + throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown") } } } @@ -824,26 +833,26 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } ) } - + override def handleExecutionSuccess(runStatus: StandardAsyncRunState, - handle: StandardAsyncPendingExecutionHandle, - returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { + handle: StandardAsyncPendingExecutionHandle, + returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { evaluateOutputs() map { case ValidJobOutputs(outputs) => // Need to make sure the paths are up to date before sending the detritus back in the response updateJobPaths() // If instance is terminated while copying stdout/stderr : status is failed while jobs outputs are ok - // => Retryable + // => Retryable if (runStatus.toString().equals("Failed")) { - jobLogger.warn("Got Failed RunStatus for success Execution") + jobLogger.warn("Got Failed RunStatus for success Execution") - val exception = new MessageAggregation { - override def exceptionContext: String = "Got Failed RunStatus for success Execution" - override def errorMessages: Iterable[String] = Array("Got Failed RunStatus for success Execution") - } - FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None) + val exception = new MessageAggregation { + override def exceptionContext: String = "Got Failed RunStatus for success Execution" + override def errorMessages: Iterable[String] = Array("Got Failed RunStatus for success Execution") + } + FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None) } else { - SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus)) + SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus)) } case InvalidJobOutputs(errors) => val exception = new MessageAggregation { @@ -862,42 +871,42 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // overrides for globbing /** - * Returns the shell scripting for linking a glob results file. - * - * @param globFile The glob. - * @return The shell scripting. - */ + * Returns the shell scripting for linking a glob results file. + * + * @param globFile The glob. + * @return The shell scripting. + */ override def globScript(globFile: WomGlobFile): String = { - - val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile) + + val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile) val controlFileName = "cromwell_glob_control_file" val absoluteGlobValue = commandDirectory.resolve(globFile.value).pathAsString - val globDirectory = globbedDir + "/." + globName + "/" - val globList = globbedDir + "/." + globName + ".list" + val globDirectory = (globbedDir + "/." + globName + "/").stripPrefix("/") + val globList = (globbedDir + "/." + globName + ".list").stripPrefix("/") val globLinkCommand: String = (if (configuration.globLinkCommand.isDefined) { - "( " + configuration.globLinkCommand.getOrElse("").toString + " )" + "( " + configuration.globLinkCommand.getOrElse("").toString + " )" } else { - "( ln -L GLOB_PATTERN GLOB_DIRECTORY 2> /dev/null ) || ( ln GLOB_PATTERN GLOB_DIRECTORY )" + "( ln -L GLOB_PATTERN GLOB_DIRECTORY 2> /dev/null ) || ( ln GLOB_PATTERN GLOB_DIRECTORY )" }).toString .replaceAll("GLOB_PATTERN", absoluteGlobValue) .replaceAll("GLOB_DIRECTORY", globDirectory) - // if on EFS : remove the globbing dir first, to remove leftover links from previous globs. + // if on EFS : remove the globbing dir first, to remove leftover links from previous globs. val mkDirCmd : String = if (configuration.efsMntPoint.isDefined && globDirectory.startsWith(configuration.efsMntPoint.getOrElse(""))) { - jobLogger.warn("Globbing on EFS has risks.") - jobLogger.warn(s"The globbing target (${globbedDir}/.${globName}/) will be overwritten when existing!") - jobLogger.warn("Consider keeping globbed outputs in the cromwell-root folder") - s"rm -Rf $globDirectory $globList && mkdir" + jobLogger.warn("Globbing on EFS has risks.") + jobLogger.warn(s"The globbing target (${globbedDir}/.${globName}/) will be overwritten when existing!") + jobLogger.warn("Consider keeping globbed outputs in the cromwell-root folder") + s"rm -Rf $globDirectory $globList && mkdir" } else { - "mkdir" + "mkdir" } - + val controlFileContent = """This file is used by Cromwell to allow for globs that would not match any file. |By its presence it works around the limitation of some backends that do not allow empty globs. |Regardless of the outcome of the glob, this file will not be part of the final list of globbed files. """.stripMargin - + s"""|# make the directory which will keep the matching files |$mkDirCmd $globDirectory | @@ -908,7 +917,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar |$globLinkCommand | |# list all the files (except the control file) that match the glob into a file called glob-[md5 of glob].list - |ls -1 $globDirectory | grep -v $controlFileName > $globList + |ls -1 $globDirectory | grep -v $controlFileName > ${globList.stripPrefix("./")} |""".stripMargin } } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 4cfa08e82a5..bfb7b82c910 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -138,7 +138,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL case input: AwsBatchFileInput if input.s3key.startsWith("s3://") => // regular s3 objects : download to working dir. - s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}" """.stripMargin + s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}"""".stripMargin .replace(AwsBatchWorkingDisk.MountPoint.pathAsString, workDir) case input: AwsBatchFileInput if efsMntPoint.isDefined && input.s3key.startsWith(efsMntPoint.get) => @@ -170,6 +170,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL val workflowId = invalidCharsPattern.replaceAllIn(jobDescriptor.workflowDescriptor.rootWorkflowId.toString,"_") val workflowName = invalidCharsPattern.replaceAllIn(jobDescriptor.workflowDescriptor.rootWorkflow.name.toString,"_") + val taskId = invalidCharsPattern.replaceAllIn(jobDescriptor.key.call.fullyQualifiedName + "-" + jobDescriptor.key.index + "-" + jobDescriptor.key.attempt,"_") val doTagging = tagResources.getOrElse(false) // development : always tag resources. //val tags: Map[String,String] = Map("cromwell-workflow-name" -> workflowName, "cromwell-workflow-id" -> workflowId, "cromwell-task-id" -> taskId) @@ -202,7 +203,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" || | { echo "attempt $$i to copy $$s3_path failed" && sleep $$((7 * "$$i")) && continue; } | # check data integrity - | _check_data_integrity "$$destination" "$$s3_path" || + | _check_data_integrity "$$destination" "$$s3_path" || | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # copy succeeded | break @@ -239,7 +240,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | break | fi | # if destination is not a bucket : abort - | if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then + | if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then | echo "$$destination is not an S3 path with a bucket and key." | DELOCALIZATION_FAILED=1 | break @@ -249,21 +250,21 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | # make sure to strip the trailing / in destination | destination=$${destination%/} | # glob directory. do recursive copy - | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" --recursive --exclude "cromwell_glob_control_file" || - | { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" --recursive --exclude "cromwell_glob_control_file" || + | { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # check integrity for each of the files (allow spaces) | SAVEIFS="$$IFS" | IFS=$$'\n' | for FILE in $$(cd "$$local_path" ; ls | grep -v cromwell_glob_control_file); do - | _check_data_integrity "$$local_path/$$FILE" "$$destination/$$FILE" || + | _check_data_integrity "$$local_path/$$FILE" "$$destination/$$FILE" || | { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; } | done | IFS="$$SAVEIFS" - | else - | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" || - | { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | else + | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" || + | { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # check content length for data integrity - | _check_data_integrity "$$local_path" "$$destination" || + | _check_data_integrity "$$local_path" "$$destination" || | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | fi | # copy succeeded @@ -274,7 +275,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |function _get_multipart_chunk_size() { | local file_path="$$1" | # file size - | file_size=$$(stat --printf="%s" "$$file_path") + | file_size=$$(stat --printf="%s" "$$file_path") | # chunk_size : you can have at most 10K parts with at least one 5MB part | # this reflects the formula in s3-copy commands of cromwell (S3FileSystemProvider.java) | # => long partSize = Math.max((objectSize / 10000L) + 1, 5 * 1024 * 1024); @@ -287,9 +288,9 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |function _check_data_integrity() { | local local_path="$$1" | local s3_path="$$2" - | + | | # remote : use content_length - | if [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then + | if [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then | bucket="$${BASH_REMATCH[1]}" | key="$${BASH_REMATCH[2]}" | else @@ -356,7 +357,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |if [[ "${doTagging}" == "true" ]]; then | echo "*** TAGGING RESOURCES ***" | _add_tags - |fi + |fi | |echo '*** LOCALIZING INPUTS ***' |if [ ! -d $workDir ]; then mkdir $workDir && chmod 777 $workDir; fi @@ -367,7 +368,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |if [[ $$LOCALIZATION_FAILED -eq 1 ]]; then | echo '*** LOCALIZATION FAILED ***' | exit 1 - |else + |else | echo '*** COMPLETED LOCALIZATION ***' |fi |set +e @@ -451,7 +452,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString => //output is on working disk mount Log.debug("output Data on working disk mount" + output.local.pathAsString) - s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" """.stripMargin + s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}"""".stripMargin // file(name (full path), s3key (delocalized path), local (file basename), mount (disk details)) // files on EFS mounts are optionally delocalized. @@ -481,6 +482,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL } // return combined result s""" + | # changes made |${test_cmd} |${md5_cmd} | """.stripMargin @@ -489,10 +491,11 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL //output on a different mount Log.debug("output data on other mount") Log.debug(output.toString) - s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" """.stripMargin + s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}"""".stripMargin case _ => "" }.mkString("\n") + "\n" + s""" + | # there hsould be some change here |if [ -f "$workDir/${jobPaths.returnCodeFilename}" ]; then _s3_delocalize_with_retry "$workDir/${jobPaths.returnCodeFilename}" "${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename}" ; fi |if [ -f "$stdErr" ]; then _s3_delocalize_with_retry "$stdErr" "${jobPaths.standardPaths.error.pathAsString}"; fi |if [ -f "$stdOut" ]; then _s3_delocalize_with_retry "$stdOut" "${jobPaths.standardPaths.output.pathAsString}"; fi @@ -508,7 +511,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |if [[ "${doTagging}" == "true" ]]; then | echo "*** TAGGING RESOURCES ***" | _add_tags - |fi + |fi | |echo '*** DELOCALIZING OUTPUTS ***' |DELOCALIZATION_FAILED=0 diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index f55d3a933f7..227b1a37713 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -34,7 +34,7 @@ package cromwell.backend.impl.aws import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, EvaluateOnExit, Host, KeyValuePair, MountPoint, ResourceRequirement, ResourceType, RetryAction, RetryStrategy, Ulimit, Volume} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, EvaluateOnExit, Host, KeyValuePair, LinuxParameters, MountPoint, ResourceRequirement, ResourceType, RetryAction, RetryStrategy, Ulimit, Volume} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.jdk.CollectionConverters._ @@ -153,8 +153,8 @@ trait AwsBatchJobDefinitionBuilder { ).toList } - def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit], efsDelocalize: Boolean, efsMakeMD5: Boolean, tagResources: Boolean): String = { - s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}" + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit], efsDelocalize: Boolean, efsMakeMD5: Boolean, tagResources: Boolean, sharedMemorySize: Int): String = { + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:${sharedMemorySize.toString}" } val environment = List.empty[KeyValuePair] @@ -179,7 +179,8 @@ trait AwsBatchJobDefinitionBuilder { ulimits, efsDelocalize, efsMakeMD5, - tagResources + tagResources, + context.runtimeAttributes.sharedMemorySize.value ) // To reuse job definition for gpu and gpu-runs, we will create a job definition that does not gpu requirements @@ -194,8 +195,12 @@ trait AwsBatchJobDefinitionBuilder { .volumes(volumes.asJava) .mountPoints(mountPoints.asJava) .environment(environment.asJava) - .ulimits(ulimits.asJava), + .ulimits(ulimits.asJava) + .linuxParameters( + LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.##).build() + ), containerPropsName) + } def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = { diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index 456355e412c..a50e41e0a5c 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -48,7 +48,7 @@ import com.typesafe.config.{ConfigException, ConfigValueFactory} import scala.util.matching.Regex import org.slf4j.{Logger, LoggerFactory} -import wom.RuntimeAttributesKeys.GpuKey +import wom.RuntimeAttributesKeys.{GpuKey, sharedMemoryKey} import scala.util.{Failure, Success, Try} import scala.jdk.CollectionConverters._ @@ -91,8 +91,10 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, ulimits: Vector[Map[String, String]], efsDelocalize: Boolean, efsMakeMD5 : Boolean, - fileSystem: String= "s3", - tagResources: Boolean = false) + sharedMemorySize: Int Refined Positive, + fileSystem: String = "s3", + tagResources: Boolean = false + ) object AwsBatchRuntimeAttributes { val Log: Logger = LoggerFactory.getLogger(this.getClass) @@ -103,6 +105,8 @@ object AwsBatchRuntimeAttributes { val awsBatchRetryAttemptsKey = "awsBatchRetryAttempts" val awsBatchEvaluateOnExitKey = "awsBatchEvaluateOnExit" + + val defaultSharedMemorySize = WomInteger(64) private val awsBatchEvaluateOnExitDefault = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) @@ -151,6 +155,12 @@ object AwsBatchRuntimeAttributes { MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue) } + private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Refined[Int, Positive]] = { + SharedMemorySizeValidation(sharedMemoryKey).withDefault( + SharedMemorySizeValidation(sharedMemoryKey).configDefaultWomValue(runtimeConfig).getOrElse(defaultSharedMemorySize) + ) + } + private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { MemoryValidation.withDefaultMemory( RuntimeAttributesKeys.MemoryMinKey, @@ -243,7 +253,8 @@ object AwsBatchRuntimeAttributes { ulimitsValidation(runtimeConfig), awsBatchefsDelocalizeValidation(runtimeConfig), awsBatchefsMakeMD5Validation(runtimeConfig), - awsBatchtagResourcesValidation(runtimeConfig) + awsBatchtagResourcesValidation(runtimeConfig), + sharedMemorySizeValidation(runtimeConfig), ) def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( cpuValidation(runtimeConfig), @@ -261,8 +272,9 @@ object AwsBatchRuntimeAttributes { ulimitsValidation(runtimeConfig), awsBatchefsDelocalizeValidation(runtimeConfig), awsBatchefsMakeMD5Validation(runtimeConfig), - awsBatchtagResourcesValidation(runtimeConfig) - ) + awsBatchtagResourcesValidation(runtimeConfig), + sharedMemorySizeValidation(runtimeConfig), + ) configuration.fileSystem match { case AWSBatchStorageSystems.s3 => validationsS3backend @@ -293,6 +305,7 @@ object AwsBatchRuntimeAttributes { val efsDelocalize: Boolean = RuntimeAttributesValidation.extract(awsBatchefsDelocalizeValidation(runtimeAttrsConfig),validatedRuntimeAttributes) val efsMakeMD5: Boolean = RuntimeAttributesValidation.extract(awsBatchefsMakeMD5Validation(runtimeAttrsConfig),validatedRuntimeAttributes) val tagResources: Boolean = RuntimeAttributesValidation.extract(awsBatchtagResourcesValidation(runtimeAttrsConfig),validatedRuntimeAttributes) + val sharedMemorySize: Int Refined Positive = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( cpu, gpuCount, @@ -310,8 +323,9 @@ object AwsBatchRuntimeAttributes { ulimits, efsDelocalize, efsMakeMD5, + sharedMemorySize, fileSystem, - tagResources + tagResources, ) } } @@ -664,6 +678,12 @@ class AwsBatchtagResourcesValidation(key: String) extends BooleanRuntimeAttribut override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a Boolean" } +object SharedMemorySizeValidation { + def apply(key: String): SharedMemorySizeValidation = new SharedMemorySizeValidation(key) +} + +class SharedMemorySizeValidation(key: String) extends PositiveIntRuntimeAttributesValidation(key) + object UlimitsValidation extends RuntimeAttributesValidation[Vector[Map[String, String]]] { override def key: String = AwsBatchRuntimeAttributes.UlimitsKey diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala index ffee6be0918..0b97aa3027a 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/io/AwsBatchGlobFunctions.scala @@ -34,8 +34,9 @@ package cromwell.backend.impl.aws.io import wom.values._ import cromwell.backend.io._ import cromwell.backend.standard._ + import scala.concurrent.Future -import java.nio.file.Paths +import java.nio.file.{Path, Paths} import cromwell.core.path.DefaultPathBuilder @@ -56,7 +57,7 @@ trait AwsBatchGlobFunctions extends GlobFunctions { override def glob(pattern: String): Future[Seq[String]] = { // get access to globName() import GlobFunctions._ - + // GOAL : // - get config (backend / runtime / ...) here to evaluate if efsMntPoint is set & if efs delocalization is set. // - according to those values : write the pattern as s3:// or as local path. @@ -67,22 +68,25 @@ trait AwsBatchGlobFunctions extends GlobFunctions { val wfid_regex = ".{8}-.{4}-.{4}-.{4}-.{12}".r val wfid = callContext.root.toString.split("/").toList.filter(element => wfid_regex.pattern.matcher(element).matches()).lastOption.getOrElse("") val globPatternName = globName(s"${pattern}-${wfid}") - val globbedDir = Paths.get(pattern).getParent.toString + val globbedDir = Paths.get(pattern).getParent match { + // remove "./" to avoid it from appearing in s3 path + case x: Path => x.toString.stripPrefix("./").stripPrefix(".") + case _ => "" + } val listFilePath = if (pattern.startsWith("/mnt/efs/")) { - DefaultPathBuilder.get(globbedDir + "/." + globPatternName + ".list") + DefaultPathBuilder.get(globbedDir + "/." + globPatternName + ".list") } else { - callContext.root.resolve(s"${globbedDir}/.${globPatternName}.list".stripPrefix("/")) + callContext.root.resolve(s".${globPatternName}.list".stripPrefix("/")) } asyncIo.readLinesAsync(listFilePath.toRealPath()) map { lines => lines.toList map { fileName => // again : this should be config based... if (pattern.startsWith("/mnt/efs/")) { - s"${globbedDir}/.${globPatternName}/${fileName}" + s"${globbedDir}/.${globPatternName}/${fileName}".stripPrefix("/") } else { - callContext.root.resolve(s"${globbedDir}/.${globPatternName}/${fileName}".stripPrefix("/")).pathAsString + callContext.root.resolve(s".${globPatternName}/${fileName}".stripPrefix("/")).pathAsString.stripPrefix("/") } } } } - } \ No newline at end of file diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala index 98af4f00282..15299f9c25c 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala @@ -749,7 +749,7 @@ class AwsBatchAsyncBackendJobExecutionActorSpec extends TestKitSuite WomSingleFile("/cromwell_root/path/to/file2"), WomSingleFile("/cromwell_root/path/to/file3"))), WomMap(WomMapType(WomSingleFileType, WomSingleFileType), Map( WomSingleFile("/cromwell_root/path/to/file4") -> WomSingleFile("/cromwell_root/path/to/file5") - )) + )), ) val workflowDescriptor = BackendWorkflowDescriptor( diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAttributesSpec.scala index e3a6d099e66..99c830b0579 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAttributesSpec.scala @@ -39,6 +39,7 @@ import cromwell.core.Tags._ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +s class AwsBatchAttributesSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers { import AwsBatchTestConfig._ diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index 61914626ed7..4d071c67eaf 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -42,11 +42,12 @@ import cromwell.util.SampleWdl import eu.timepit.refined.api.Refined import eu.timepit.refined.auto._ import eu.timepit.refined.numeric._ +import eu.timepit.refined.refineMV import org.scalatest.PrivateMethodTester import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider -import software.amazon.awssdk.services.batch.model.{ContainerDetail, EvaluateOnExit, JobDetail, KeyValuePair, ResourceRequirement, RetryAction, RetryStrategy} +import software.amazon.awssdk.services.batch.model.{ContainerDetail, EvaluateOnExit, JobDetail, KeyValuePair, LinuxParameters, ResourceRequirement, RetryAction, RetryStrategy} import spray.json.{JsObject, JsString} import wdl4s.parser.MemoryUnit import wom.format.MemorySize @@ -73,6 +74,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi | cd /cromwell_root | | + | | echo "Hello World! Welcome to Cromwell . . . on AWS!" >&2 |) > '/cromwell_root/hello-stdout.log' 2> '/cromwell_root/hello-stderr.log' |echo $? > /cromwell_root/hello-rc.txt.tmp @@ -102,13 +104,14 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val call: CommandCallNode = workFlowDescriptor.callable.taskCallNodes.head val jobKey: BackendJobDescriptorKey = BackendJobDescriptorKey(call, None, 1) - val jobDescriptor: BackendJobDescriptor = BackendJobDescriptor(null, null, null, Map.empty, null, null, null) + val jobDescriptor: BackendJobDescriptor = BackendJobDescriptor(workFlowDescriptor, jobKey, null, Map.empty, null, null, null) val jobPaths: AwsBatchJobPaths = AwsBatchJobPaths(workflowPaths, jobKey) val s3Inputs: Set[AwsBatchInput] = Set(AwsBatchFileInput("foo", "s3://bucket/foo", DefaultPathBuilder.get("foo"), AwsBatchWorkingDisk())) val s3Outputs: Set[AwsBatchFileOutput] = Set(AwsBatchFileOutput("baa", "s3://bucket/somewhere/baa", DefaultPathBuilder.get("baa"), AwsBatchWorkingDisk())) val cpu: Int Refined Positive = 2 + val sharedMemorySize: Int Refined Positive = 64 val runtimeAttributes: AwsBatchRuntimeAttributes = new AwsBatchRuntimeAttributes( cpu = cpu, gpuCount = 0, @@ -126,37 +129,38 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi ulimits = Vector(Map.empty[String, String]), efsDelocalize = false, efsMakeMD5 = false, - fileSystem = "s3") + fileSystem = "s3", + sharedMemorySize = sharedMemorySize + ) val batchJobDefintion = AwsBatchJobDefinitionContext( runtimeAttributes = runtimeAttributes, commandText = "", dockerRcPath = "", dockerStdoutPath = "", dockerStderrPath = "", jobDescriptor = jobDescriptor - , jobPaths = jobPaths, inputs = Set(), outputs = Set(), fsxMntPoint = None, None, None, None - + , jobPaths = jobPaths, inputs = Set(), outputs = Set(), fsxMntPoint = None, None, None, None, None ) val containerDetail: ContainerDetail = ContainerDetail.builder().exitCode(0).build() val jobDetail: JobDetail = JobDetail.builder().container(containerDetail).build private def generateBasicJob: AwsBatchJob = { - val job = AwsBatchJob(null, runtimeAttributes, "commandLine", script, + val job = AwsBatchJob(jobDescriptor, runtimeAttributes, "commandLine", script, "/cromwell_root/hello-rc.txt", "/cromwell_root/hello-stdout.log", "/cromwell_root/hello-stderr.log", Seq.empty[AwsBatchInput].toSet, Seq.empty[AwsBatchFileOutput].toSet, - jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None) + jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None, None) job } private def generateBasicJobForLocalFS: AwsBatchJob = { - val job = AwsBatchJob(null, runtimeAttributes.copy(fileSystem="local"), "commandLine", script, + val job = AwsBatchJob(jobDescriptor, runtimeAttributes.copy(fileSystem="local"), "commandLine", script, "/cromwell_root/hello-rc.txt", "/cromwell_root/hello-stdout.log", "/cromwell_root/hello-stderr.log", Seq.empty[AwsBatchInput].toSet, Seq.empty[AwsBatchFileOutput].toSet, - jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None) + jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None, None) job } private def generateJobWithS3InOut: AwsBatchJob = { - val job = AwsBatchJob(null, runtimeAttributes, "commandLine", script, + val job = AwsBatchJob(jobDescriptor, runtimeAttributes, "commandLine", script, "/cromwell_root/hello-rc.txt", "/cromwell_root/hello-stdout.log", "/cromwell_root/hello-stderr.log", s3Inputs, s3Outputs, - jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None) + jobPaths, Seq.empty[AwsBatchParameter], None, None, None, None, None, None, None) job } @@ -197,31 +201,30 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val job = generateBasicJob val retryFunctionText = s""" - |export AWS_METADATA_SERVICE_TIMEOUT=10 - |export AWS_METADATA_SERVICE_NUM_ATTEMPTS=10 - | |function _s3_localize_with_retry() { - | local s3_path=$$1 + | local s3_path="$$1" | # destination must be the path to a file and not just the directory you want the file in - | local destination=$$2 + | local destination="$$2" | | for i in {1..6}; | do | # abort if tries are exhausted | if [ "$$i" -eq 6 ]; then - | echo "failed to copy $$s3_path after $$(( $$i - 1 )) attempts. aborting" - | exit 2 + | echo "failed to copy $$s3_path after $$(( $$i - 1 )) attempts." + | LOCALIZATION_FAILED=1 + | break | fi | # check validity of source path - | if ! [[ $$s3_path =~ s3://([^/]+)/(.+) ]]; then - | echo "$$s3_path is not an S3 path with a bucket and key. aborting" - | exit 1 + | if ! [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then + | echo "$$s3_path is not an S3 path with a bucket and key." + | LOCALIZATION_FAILED=1 + | break | fi | # copy | /usr/local/aws-cli/v2/current/bin/aws s3 cp --no-progress "$$s3_path" "$$destination" || | { echo "attempt $$i to copy $$s3_path failed" && sleep $$((7 * "$$i")) && continue; } | # check data integrity - | _check_data_integrity $$destination $$s3_path || + | _check_data_integrity "$$destination" "$$s3_path" || | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # copy succeeded | break @@ -233,16 +236,23 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi it should "s3 delocalization with retry function in reconfigured script" in { val job = generateBasicJob - val delocalizeText = s""" - | + val delocalizeText = + s""" |function _s3_delocalize_with_retry() { | # input variables - | local local_path=$$1 + | local local_path="$$1" | # destination must be the path to a file and not just the directory you want the file in - | local destination=$$2 + | local destination="$$2" + | + | # if file/folder does not exist, return immediately + | if [[ ! -e "$$local_path" ]]; then + | echo "$$local_path does not exist. skipping delocalization" + | DELOCALIZATION_FAILED=1 + | return + | fi | | # get the multipart chunk size - | chunk_size=$$(_get_multipart_chunk_size $$local_path) + | chunk_size=$$(_get_multipart_chunk_size "$$local_path") | local MP_THRESHOLD=5368709120 | # then set them | /usr/local/aws-cli/v2/current/bin/aws configure set default.s3.multipart_threshold $$MP_THRESHOLD @@ -253,31 +263,37 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi | do | # if tries exceeded : abort | if [ "$$i" -eq 6 ]; then - | echo "failed to delocalize $$local_path after $$(( $$i - 1 )) attempts. aborting" - | exit 2 + | echo "failed to delocalize $$local_path after $$(( $$i - 1 )) attempts." + | DELOCALIZATION_FAILED=1 + | break | fi | # if destination is not a bucket : abort - | if ! [[ $$destination =~ s3://([^/]+)/(.+) ]]; then - | echo "$$destination is not an S3 path with a bucket and key. aborting" - | exit 1 + | if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then + | echo "$$destination is not an S3 path with a bucket and key." + | DELOCALIZATION_FAILED=1 + | break | fi | # copy ok or try again. | if [[ -d "$$local_path" ]]; then | # make sure to strip the trailing / in destination | destination=$${destination%/} | # glob directory. do recursive copy - | /usr/local/aws-cli/v2/current/bin/aws s3 cp --no-progress $$local_path $$destination --recursive --exclude "cromwell_glob_control_file" || + | /usr/local/aws-cli/v2/current/bin/aws s3 cp --no-progress "$$local_path" "$$destination" --recursive --exclude "cromwell_glob_control_file" || | { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; } - | # check integrity for each of the files - | for FILE in $$(cd $$local_path ; ls | grep -v cromwell_glob_control_file); do - | _check_data_integrity $$local_path/$$FILE $$destination/$$FILE || + | # check integrity for each of the files (allow spaces) + | SAVEIFS="$$IFS" + | IFS=$$' + |' + | for FILE in $$(cd "$$local_path" ; ls | grep -v cromwell_glob_control_file); do + | _check_data_integrity "$$local_path/$$FILE" "$$destination/$$FILE" || | { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; } | done + | IFS="$$SAVEIFS" | else | /usr/local/aws-cli/v2/current/bin/aws s3 cp --no-progress "$$local_path" "$$destination" || | { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # check content length for data integrity - | _check_data_integrity $$local_path $$destination || + | _check_data_integrity "$$local_path" "$$destination" || | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | fi | # copy succeeded @@ -292,16 +308,16 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val checkDataIntegrityBlock = s""" |function _check_data_integrity() { - | local local_path=$$1 - | local s3_path=$$2 + | local local_path="$$1" + | local s3_path="$$2" | | # remote : use content_length - | if [[ $$s3_path =~ s3://([^/]+)/(.+) ]]; then + | if [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then | bucket="$${BASH_REMATCH[1]}" | key="$${BASH_REMATCH[2]}" | else | # this is already checked in the caller function - | echo "$$s3_path is not an S3 path with a bucket and key. aborting" + | echo "$$s3_path is not an S3 path with a bucket and key." | exit 1 | fi | s3_content_length=$$(/usr/local/aws-cli/v2/current/bin/aws s3api head-object --bucket "$$bucket" --key "$$key" --query 'ContentLength') || @@ -315,8 +331,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi | else | false | fi - |} - |""".stripMargin + |}""".stripMargin job.reconfiguredScript should include (checkDataIntegrityBlock) } @@ -325,9 +340,9 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val getMultiplePartChunkSize = s""" |function _get_multipart_chunk_size() { - | local file_path=$$1 + | local file_path="$$1" | # file size - | file_size=$$(stat --printf="%s" $$file_path) + | file_size=$$(stat --printf="%s" "$$file_path") | # chunk_size : you can have at most 10K parts with at least one 5MB part | # this reflects the formula in s3-copy commands of cromwell (S3FileSystemProvider.java) | # => long partSize = Math.max((objectSize / 10000L) + 1, 5 * 1024 * 1024); @@ -347,14 +362,27 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi s""" |{ |set -e + |# (re-)add tags to include added volumes: + |if [[ "false" == "true" ]]; then + | echo "*** TAGGING RESOURCES ***" + | _add_tags + |fi + | |echo '*** DELOCALIZING OUTPUTS ***' - |_s3_delocalize_with_retry /tmp/scratch/baa s3://bucket/somewhere/baa + |DELOCALIZATION_FAILED=0 + |_s3_delocalize_with_retry "/tmp/scratch/baa" "s3://bucket/somewhere/baa" | - |if [ -f /tmp/scratch/hello-rc.txt ]; then _s3_delocalize_with_retry /tmp/scratch/hello-rc.txt ${job.jobPaths.returnCode} ; fi - |if [ -f /tmp/scratch/hello-stderr.log ]; then _s3_delocalize_with_retry /tmp/scratch/hello-stderr.log ${job.jobPaths.standardPaths.error}; fi - |if [ -f /tmp/scratch/hello-stdout.log ]; then _s3_delocalize_with_retry /tmp/scratch/hello-stdout.log ${job.jobPaths.standardPaths.output}; fi + |if [ -f "/tmp/scratch/hello-rc.txt" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-rc.txt" "${job.jobPaths.returnCode}" ; fi + |if [ -f "/tmp/scratch/hello-stderr.log" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-stderr.log" "${job.jobPaths.standardPaths.error}"; fi + |if [ -f "/tmp/scratch/hello-stdout.log" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-stdout.log" "${job.jobPaths.standardPaths.output}"; fi | - |echo '*** COMPLETED DELOCALIZATION ***' + |if [[ $$DELOCALIZATION_FAILED -eq 1 ]]; then + | echo '*** DELOCALIZATION FAILED ***' + | echo '*** EXITING WITH RETURN CODE 1***' + | exit 1 + |else + | echo '*** COMPLETED DELOCALIZATION ***' + |fi |echo '*** EXITING WITH RETURN CODE ***' |rc=$$(head -n 1 /tmp/scratch/hello-rc.txt) |echo $$rc @@ -370,11 +398,24 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi s""" |{ |set -e + |# tag instance and volumes to ensure tags are present in case of failure: + |if [[ "false" == "true" ]]; then + | echo "*** TAGGING RESOURCES ***" + | _add_tags + |fi + | |echo '*** LOCALIZING INPUTS ***' |if [ ! -d /tmp/scratch ]; then mkdir /tmp/scratch && chmod 777 /tmp/scratch; fi |cd /tmp/scratch - |_s3_localize_with_retry s3://bucket/foo /tmp/scratch/foo - |echo '*** COMPLETED LOCALIZATION ***' + |# make sure localization completes successfully + |LOCALIZATION_FAILED=0 + |_s3_localize_with_retry "s3://bucket/foo" "/tmp/scratch/foo" + |if [[ $$LOCALIZATION_FAILED -eq 1 ]]; then + | echo '*** LOCALIZATION FAILED ***' + | exit 1 + |else + | echo '*** COMPLETED LOCALIZATION ***' + |fi |set +e |} |""".stripMargin @@ -441,5 +482,25 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val jobDefinition = StandardAwsBatchJobDefinitionBuilder.build(batchJobDefintion.copy(runtimeAttributes = runtime)) val actual = jobDefinition.containerProperties.resourceRequirements - expected should equal(CollectionConverters.asScala(actual).toSeq)} + expected should equal(CollectionConverters.asScala(actual).toSeq) + } + + it should "use default shared memory size of 64MB" in { + val jobDefinition = StandardAwsBatchJobDefinitionBuilder.build(batchJobDefintion) + val actual = jobDefinition.containerProperties.linuxParameters() + val expected = LinuxParameters.builder().sharedMemorySize(64).build() + expected should equal(actual) + } + + it should "use user shared memory size if set" in { + val runtime = runtimeAttributes.copy( + gpuCount = 1, + sharedMemorySize = refineMV[Positive](100) + ) + val jobDefinition = StandardAwsBatchJobDefinitionBuilder.build(batchJobDefintion.copy(runtimeAttributes = runtime)) + val actual = jobDefinition.containerProperties.linuxParameters() + val expected = LinuxParameters.builder().sharedMemorySize(100).build() + expected should equal(actual) + } + } diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index a9411b574b5..1a217844205 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -71,7 +71,8 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout Vector(Map.empty[String, String]), Vector(Map.empty[String, String]), false, - false + false, + sharedMemorySize = refineMV[Positive](64) ) val expectedDefaultsLocalFS = new AwsBatchRuntimeAttributes(refineMV[Positive](1), 0, Vector("us-east-1a", "us-east-1b"), @@ -88,6 +89,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout Vector(Map.empty[String, String]), false, false, + refineMV[Positive](64), "local") "AwsBatchRuntimeAttributes" should { @@ -408,6 +410,17 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout )) } + "if sharedMemorySize is set" in { + val runtimeAttributes = Map( + "docker" -> WomString("ubuntu:latest"), + "scriptBucketName" -> WomString("my-stuff"), + "sharedMemorySize" -> WomInteger(10) + ) + assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedDefaults.copy( + sharedMemorySize = refineMV[Positive](10) + )) + } + "missing or invalid action key result in an invalid awsBatchEvaluateOnExit" in { val invalidEvaluateOnExit = List( // missing action key diff --git a/wom/src/main/scala/wom/RuntimeAttributes.scala b/wom/src/main/scala/wom/RuntimeAttributes.scala index 853e3145c23..44ba198cf70 100644 --- a/wom/src/main/scala/wom/RuntimeAttributes.scala +++ b/wom/src/main/scala/wom/RuntimeAttributes.scala @@ -32,6 +32,7 @@ object RuntimeAttributesKeys { val OutDirMaxKey = "outDirMax" val FailOnStderrKey = "failOnStderr" val ContinueOnReturnCodeKey = "continueOnReturnCode" + val sharedMemoryKey = "sharedMemorySize" } case class RuntimeAttributes(attributes: Map[String, WomExpression])