diff --git a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala index 2faf49696ac..44451a1e791 100644 --- a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala @@ -54,6 +54,10 @@ object DefaultIoCommand { override def commandDescription: String = s"DefaultIoExistsCommand file '$file'" } + case class DefaultIoExistsOrThrowCommand(override val file: Path) extends IoExistsOrThrowCommand(file) { + override def commandDescription: String = s"DefaultIoExistsOrThrowCommand file '$file'" + } + case class DefaultIoReadLinesCommand(override val file: Path) extends IoReadLinesCommand(file) { override def commandDescription: String = s"DefaultIoReadLinesCommand file '$file'" } diff --git a/core/src/main/scala/cromwell/core/io/IoCommand.scala b/core/src/main/scala/cromwell/core/io/IoCommand.scala index 9db50bc2c87..ec90c3b6c5c 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommand.scala @@ -182,6 +182,14 @@ abstract class IoExistsCommand(val file: Path) extends SingleFileIoCommand[Boole override lazy val name = "exist" } +/** + * Check whether a file exists but throw an exception if it doesn't + */ +abstract class IoExistsOrThrowCommand(val file: Path) extends SingleFileIoCommand[Boolean] { + override def toString = s"Throw error if ${file.pathAsString} does not exist" + override lazy val name = "exist" +} + /** * Return the lines of a file in a collection */ diff --git a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala index c4f5da49959..ae5ef05e252 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala @@ -21,6 +21,7 @@ abstract class PartialIoCommandBuilder { def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = PartialFunction.empty def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty + def existsOrThrowCommand: PartialFunction[Path, Try[IoExistsOrThrowCommand]] = PartialFunction.empty def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty def readLinesCommand: PartialFunction[Path, Try[IoReadLinesCommand]] = PartialFunction.empty } @@ -94,6 +95,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp def existsCommand(file: Path): Try[IoExistsCommand] = buildOrDefault(_.existsCommand, file, DefaultIoExistsCommand(file)) + def existsOrThrowCommand(file: Path): Try[IoExistsOrThrowCommand] = + buildOrDefault(_.existsOrThrowCommand, file, DefaultIoExistsOrThrowCommand(file)) + def isDirectoryCommand(file: Path): Try[IoIsDirectoryCommand] = buildOrDefault(_.isDirectoryCommand, file, DefaultIoIsDirectoryCommand(file)) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index 5be9ad36d9c..a2de8303a23 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -20,6 +20,7 @@ import net.ceedubs.ficus.readers.ValueReader import java.io._ import java.nio.charset.StandardCharsets +import java.nio.file.NoSuchFileException import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -78,6 +79,7 @@ class NioFlow(parallelism: Int, case hashCommand: IoHashCommand => hash(hashCommand) map hashCommand.success case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success + case existsOrThrowCommand: IoExistsOrThrowCommand => existsOrThrow(existsOrThrowCommand) map existsOrThrowCommand.success case readLinesCommand: IoReadLinesCommand => readLines(readLinesCommand) map readLinesCommand.success case isDirectoryCommand: IoIsDirectoryCommand => isDirectory(isDirectoryCommand) map isDirectoryCommand.success case _ => IO.raiseError(new UnsupportedOperationException("Method not implemented")) @@ -183,6 +185,13 @@ class NioFlow(parallelism: Int, exists.file.exists } + private def existsOrThrow(exists: IoExistsOrThrowCommand) = IO { + exists.file.exists match { + case false => throw new NoSuchFileException(exists.file.toString) + case true => true + } + } + private def readLines(exists: IoReadLinesCommand) = IO { exists.file.withReader { reader => LazyList.continually(reader.readLine()).takeWhile(_ != null).toList diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala index 878b6c5bc26..d41bfd26d67 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala @@ -90,6 +90,10 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder override def existsCommand: PartialFunction[Path, Try[S3BatchExistsCommand]] = { case path: S3Path => Try(S3BatchExistsCommand(path)) } + + override def existsOrThrowCommand: PartialFunction[Path, Try[S3BatchExistsOrThrowCommand]] = { case path: S3Path => + Try(S3BatchExistsOrThrowCommand(path)) + } } /** diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala index 696c4d0240c..5a98e46e554 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala @@ -38,12 +38,14 @@ import cromwell.core.io.{ IoCopyCommand, IoDeleteCommand, IoExistsCommand, + IoExistsOrThrowCommand, IoHashCommand, IoSizeCommand, IoTouchCommand } import cromwell.filesystems.s3.S3Path +import java.nio.file.NoSuchFileException /** * Io commands with S3 paths and some logic enabling batching of request. @@ -143,3 +145,20 @@ case class S3BatchExistsCommand(override val file: S3Path) } override def commandDescription: String = s"S3BatchExistsCommand file '$file'" } + +/** + * `IoCommand` to determine the existence of an object in S3, but throws execption if the object can't be found + * @param file the path to the object + */ +case class S3BatchExistsOrThrowCommand(override val file: S3Path) + extends IoExistsOrThrowCommand(file) + with S3BatchHeadCommand[Boolean] { + override def mapResponse(response: HeadObjectResponse): Boolean = true + override def onFailure(error: SdkException): Option[Left[Boolean, Nothing]] = + // If the object can't be found, fail the request and throw an exception + error match { + case _: NoSuchKeyException => throw new NoSuchFileException(file.toString) + case _ => None + } + override def commandDescription: String = s"S3BatchExistsCommand file '$file'" +} diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 15a3fdeff8a..3c5b0acbc72 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -427,18 +427,17 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL // need to make md5sum? val md5_cmd = if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) { Log.debug("Add cmd to create MD5 sibling.") - // this does NOT regenerate the md5 in case the file is overwritten ! - // TODO : Add check for file age => recreate if md5 older than main file + // generate MD5 if missing or if local file is newer than sibling md5 s""" - |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]] ; then + |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then | # the glob list | md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 ); | # globbed files, using specified number of cpus for parallel processing. - SAVEIFS="$$IFS" - |IFS=$$'\n' + | SAVEIFS="$$IFS" + | IFS=$$'\n' | cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" | xargs -I% -P${runtimeAttributes.cpu.##.toString} bash -c "md5sum ${globDirectory}/% > ${globDirectory}/%.md5" + | IFS="$$SAVEIFS" |fi - |IFS="$$SAVEIFS" |""".stripMargin } // return combined result @@ -461,7 +460,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL Log.debug("output Data on working disk mount" + output.local.pathAsString) s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" """.stripMargin - // file(name (full path), s3key (delocalized path), local (file basename), mount (disk details)) // files on EFS mounts are optionally delocalized. case output: AwsBatchFileOutput if efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get => Log.debug("EFS output file detected: "+ output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}") @@ -478,9 +476,9 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL // need to make md5sum? var md5_cmd = "" if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) { - Log.debug("Add cmd to create MD5 sibling.") + Log.debug("Add cmd to create MD5 sibling if missing or outdated.") md5_cmd = s""" - |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]] ; then + |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then | md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 ); |fi |""".stripMargin diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 86680f4dcab..7f2cbd1b953 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -42,6 +42,7 @@ import java.security.MessageDigest import org.apache.commons.lang3.builder.{ToStringBuilder, ToStringStyle} import org.slf4j.{Logger, LoggerFactory} import wdl4s.parser.MemoryUnit +import wom.format.MemorySize /** * Responsible for the creation of the job definition. @@ -164,8 +165,8 @@ trait AwsBatchJobDefinitionBuilder { efsMakeMD5: Boolean, tagResources: Boolean, logGroupName: String, - sharedMemorySize: Int): String = { - s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName" + sharedMemorySize: MemorySize): String = { + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName:${sharedMemorySize.to(MemoryUnit.MB).amount.toInt}" } val environment = List.empty[KeyValuePair] @@ -201,9 +202,8 @@ trait AwsBatchJobDefinitionBuilder { efsMakeMD5, tagResources, logGroupName, - context.runtimeAttributes.sharedMemorySize.value + context.runtimeAttributes.sharedMemorySize ) - // To reuse job definition for gpu and gpu-runs, we will create a job definition that does not gpu requirements // since aws batch does not allow you to set gpu as 0 when you dont need it. you will always need cpu and memory (ContainerProperties.builder() @@ -219,8 +219,8 @@ trait AwsBatchJobDefinitionBuilder { .environment(environment.asJava) .ulimits(ulimits.asJava) .linuxParameters( - LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.##).build() - ), + LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.to(MemoryUnit.MB).amount.toInt).build() // Convert MemorySize to MB + ), containerPropsName) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index fdf4021f257..e90f2feeb78 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -42,13 +42,14 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric.Positive import wom.RuntimeAttributesKeys import wom.format.MemorySize +import wdl4s.parser.MemoryUnit import wom.types._ import wom.values._ import com.typesafe.config.{ConfigException, ConfigValueFactory} import scala.util.matching.Regex import org.slf4j.{Logger, LoggerFactory} -import wom.RuntimeAttributesKeys.{GpuKey, sharedMemoryKey} +import wom.RuntimeAttributesKeys.{GpuKey } // , sharedMemoryKey} import scala.util.{Failure, Success, Try} import scala.jdk.CollectionConverters._ @@ -94,7 +95,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, ulimits: Vector[Map[String, String]], efsDelocalize: Boolean, efsMakeMD5 : Boolean, - sharedMemorySize: Int Refined Positive, + sharedMemorySize: MemorySize, logGroupName: String, additionalTags: Map[String, String], fileSystem: String= "s3", @@ -110,7 +111,7 @@ object AwsBatchRuntimeAttributes { val awsBatchEvaluateOnExitKey = "awsBatchEvaluateOnExit" - val defaultSharedMemorySize = WomInteger(64) + val defaultSharedMemorySize = MemorySize(64, MemoryUnit.MB) private val awsBatchEvaluateOnExitDefault = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) @@ -179,9 +180,10 @@ object AwsBatchRuntimeAttributes { noAddressValidationInstance .withDefault(noAddressValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse NoAddressDefaultValue) - private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Refined[Int, Positive]] = { - SharedMemorySizeValidation(sharedMemoryKey).withDefault( - SharedMemorySizeValidation(sharedMemoryKey).configDefaultWomValue(runtimeConfig).getOrElse(defaultSharedMemorySize) + private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { + MemoryValidation.withDefaultMemory( + RuntimeAttributesKeys.sharedMemoryKey, + MemoryValidation.configDefaultString(RuntimeAttributesKeys.sharedMemoryKey, runtimeConfig) getOrElse defaultSharedMemorySize.toString ) } @@ -337,7 +339,7 @@ object AwsBatchRuntimeAttributes { val efsDelocalize: Boolean = RuntimeAttributesValidation.extract(awsBatchefsDelocalizeValidation(runtimeAttrsConfig),validatedRuntimeAttributes) val efsMakeMD5: Boolean = RuntimeAttributesValidation.extract(awsBatchefsMakeMD5Validation(runtimeAttrsConfig),validatedRuntimeAttributes) val tagResources: Boolean = RuntimeAttributesValidation.extract(awsBatchtagResourcesValidation(runtimeAttrsConfig),validatedRuntimeAttributes) - val sharedMemorySize: Int Refined Positive = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val sharedMemorySize: MemorySize = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( cpu, @@ -708,12 +710,6 @@ class AwsBatchtagResourcesValidation(key: String) extends BooleanRuntimeAttribut override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a Boolean" } -object SharedMemorySizeValidation { - def apply(key: String): SharedMemorySizeValidation = new SharedMemorySizeValidation(key) -} - -class SharedMemorySizeValidation(key: String) extends PositiveIntRuntimeAttributesValidation(key) - object UlimitsValidation extends RuntimeAttributesValidation[Vector[Map[String, String]]] { override def key: String = AwsBatchRuntimeAttributes.UlimitsKey diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md index 4869dd5e573..eb998dceb0f 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md @@ -75,7 +75,16 @@ This runtime attribute adds support to [*AWS Batch Automated Job Retries*](https It takes an Int, between 1 and 10, as a value that indicates the maximum number of times AWS Batch should retry a failed task. If the value 0 is passed, the [*Retry Strategy*](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#retryStrategy) will not be added to the job definiton and the task will run just once. -This configuration should be passed in the `options.json` file when launching the pipeline. +This configuration should be passed in the `options.json` file when launching the pipeline, as a default_runtime_attribute: + +``` +{ + default_runtime_attribute: { + "awsBatchRetryAttempts" : 2 + } +} +``` +Alternatively, it can be provided per task, using the task runtime settings in the WDL: ``` runtime { @@ -83,6 +92,8 @@ runtime { } ``` +_Note: Also see 'MaxRetries' options_ + ### `awsBatchEvaluteOnExit` *Default: _[]_* - will always retry @@ -123,14 +134,17 @@ A list of [`ulimits`](https://docs.aws.amazon.com/batch/latest/userguide/job_def This configuration should be passed in the `options.json` file when launching the pipeline. ``` -"ulimits": [ - { - "name": string, - "softLimit": integer, - "hardLimit": integer +{ + default_runtime_attribute: { + "ulimits": [ + { + "name": "string", + "softLimit": "integer", + "hardLimit": "integer" + } + ] } - ... -] +} ``` Parameter description: @@ -141,12 +155,12 @@ Parameter description: - `softLimit` - The soft limit for the `ulimit` type. - - Type: Integer + - Type: Integer, but provided as string (with quotes) - Required: Yes, when `ulimits` is used. - `hardLimit` - The hard limit for the `ulimit` type. - - Type: Integer + - Type: Integer, but provided as string (with quotes) - Required: Yes, when `ulimits` is used. ### GPU support @@ -177,6 +191,27 @@ the gpuCount value will be passed to AWS Batch as part of [resourceRequirements] You will need to use this feature in conjunction with a aws queue that has GPU instances (see [compute-environment](/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/DEPLOY.md#compute-environment) for more inforamtion) +### Shared Memory Support + +Tasks can request shared memory by setting `sharedMemorySize` in the task runtime attribute. This is required to support workflows with tasks that uses SharedMemory for their work, for example in popular ML libraries like PyTorch. The memory is available under /dev/shm in the task. The value is provided as a Memory value, taking GB/MB suffixes. For instance, requesting 1Gb of shared memory: + +``` +task gpu_queue_task { + input { + ... + } + command <<< + ... + >>> + output {} + runtime { + sharedMemorySize: "1024 MB" + } +} +``` + + + ### Call Caching with ECR private AWS ECR is a private container registry, for which access can be regulated using IAM. Call caching is possible by setting up the following configuration: @@ -350,7 +385,7 @@ Cromwell EC2 instances can be equipped with a shared elastic filesystem, termed Following the [GenomicsWorkFlows](https://docs.opendata.aws/genomics-workflows/orchestration/cromwell/cromwell-overview.html) deployment stack and selecting "Create EFS", you will end up with an EFS filesystem accessible within the provided subnets. It is mounted by default in EC2 workers under /mnt/efs. This is specified in the launch templates USER_DATA if you want to change this. *BEWARE* : For globbing to work on EFS, the mountpoint must be left as "/mnt/efs" ! -Next, it is recommended to change the EFS setup (in console : select EFS service, then the "SharedDataGenomics" volume, edit). Change performance settings to Enhanced/Elastic, because the bursting throughput is usually insufficient. Optionally set the lifecycle to archive infrequently accessed data and reduce costs. +Next, it is recommended to change the EFS setup (in console : select EFS service, then the "SharedDataGenomics" volume, edit). Change performance settings to Enhanced/Elastic, because the bursting throughput is usually insufficient (there can be a significant cost for that!). Optionally set the lifecycle to archive infrequently accessed data and reduce costs. 2. Set Cromwell Configuration @@ -544,6 +579,7 @@ task task_three { ``` #### TAGGING RESOURCES + AWS Batch tags jobs and, if configured in the compute environment, instances and volumes with generic tags to track costs. These tags typically include the job-queue name. To allow more detailed cost tracking, it is possible to enable tagging instances and connected volumes with the following information : - *cromwell-workflow-name* : the top-level name of the submitted WDL (eg "workflow myWorkflow {...}") @@ -556,7 +592,7 @@ In case the same instance is reused for multiple tasks, unique tag values are co - cromwell-workflow-id : 2443daac-c232-4e0a-920d-fbf53273e9c5;df19029e-cc02-41d5-a26d-8d30c0ab05cb - cromwell-task-id : myWorkflow.myTask-None-1 -To enable tagging, add "tagResources = true" to the default-runtime-attributes section of your configuration: +To enable default tagging, add "tagResources = true" to the default-runtime-attributes section of your configuration: ``` backend { @@ -575,6 +611,28 @@ backend { ``` +Additional, custom tags can be added to jobs, using the "additionalTags" paramter in the "default-runtime-attributes" section of the job definition: + +``` +backend { + providers { + AWSBatch { + config{ + default-runtime-attributes { + logGroupName: "/Cromwell/job/" + additionalTags { + projectid: "project1" + } + } + } + } + } +} +``` + +The _logGroupName_ enables you to send the logs to a custom log group name and tag the jobs that Cromwell submits. The _additionalTags_ allows you to specify tags to be added to the jobs as : pairs. + + AWS Batch --------- diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala index b15dc52b364..7610954df3a 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala @@ -36,7 +36,7 @@ import cromwell.backend.impl.aws.{AWSBatchStorageSystems, AwsBatchBackendInitial import cromwell.backend.io.JobPaths import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardCacheHitCopyingActorParams} import cromwell.core.CallOutputs -import cromwell.core.io.{DefaultIoCommandBuilder, IoCommand, IoCommandBuilder, IoTouchCommand} +import cromwell.core.io.{DefaultIoCommandBuilder, IoCommand, IoCommandBuilder} import cromwell.core.path.Path import cromwell.core.simpleton.{WomValueBuilder, WomValueSimpleton} import cromwell.filesystems.s3.batch.S3BatchCommandBuilder @@ -61,10 +61,11 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin sourceCallRootPath: Path ): Try[(CallOutputs, Set[IoCommand[_]])] = (batchAttributes.fileSystem, cachingStrategy) match { - case (AWSBatchStorageSystems.s3, UseOriginalCachedOutputs) => - val touchCommands: Seq[Try[IoTouchCommand]] = womValueSimpletons collect { + case (AWSBatchStorageSystems.s3, UseOriginalCachedOutputs) => + val touchCommands: Seq[Try[IoCommand[_]]] = womValueSimpletons collect { + // only work on WomFiles, skip others? case WomValueSimpleton(_, wdlFile: WomFile) => - getPath(wdlFile.value) flatMap S3BatchCommandBuilder.touchCommand + getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.existsOrThrowCommand) } TryUtil.sequence(touchCommands) map { @@ -88,7 +89,7 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin Try { // PROD-444: Keep It Short and Simple: Throw on the first error and let the outer Try catch-and-re-wrap (newDetritus + (JobPaths.CallRootPathKey -> destinationCallRootPath)) -> - newDetritus.values.map(S3BatchCommandBuilder.touchCommand(_).get).toSet + newDetritus.values.map(S3BatchCommandBuilder.existsOrThrowCommand(_).get).toSet } } case (_, _) => super.processDetritus(sourceJobDetritusFiles) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala index 5736b6bf130..eb6a9f772e7 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala @@ -39,10 +39,12 @@ import cromwell.core.io.DefaultIoCommandBuilder import scala.util.Try import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest import cromwell.core.path.DefaultPathBuilder +import org.slf4j.{Logger, LoggerFactory} class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorParams) extends StandardFileHashingActor(standardParams) { + val Log: Logger = LoggerFactory.getLogger(StandardFileHashingActor.getClass) override val ioCommandBuilder = BackendInitializationData .as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption) .configuration @@ -55,21 +57,32 @@ class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorPa val aws_config = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption).configuration // custom strategy to handle efs (local) files, in case sibling-md5 file is present. + // if valid md5 is found : return the hash + // if no md5 is found : return None (pass request to parent hashing actor) + // if outdated md5 is found : return invalid string (assume file has been altered after md5 creation) + // if file is missing : return invalid string override def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = { val file = DefaultPathBuilder.get(fileRequest.file.valueString) if (aws_config.efsMntPoint.isDefined && file.toString.startsWith(aws_config.efsMntPoint.getOrElse("--")) && aws_config.checkSiblingMd5.getOrElse(false)) { val md5 = file.sibling(s"${file.toString}.md5") // check existance of the file : if (!file.exists) { + Log.debug(s"File Missing: ${file.toString}") // if missing, cache hit is invalid; return invalid md5 Some("File Missing").map(str => Try(str)) } - // check existence of the sibling file - else if (md5.exists) { + // check existence of the sibling file and make sure it's newer than main file + else if (md5.exists && md5.lastModifiedTime.isAfter(file.lastModifiedTime)) { // read the file. + Log.debug("Found valid sibling file for " + file.toString) val md5_value: Option[String] = Some(md5.contentAsString.split("\\s+")(0)) md5_value.map(str => Try(str)) + } else if (md5.exists && md5.lastModifiedTime.isBefore(file.lastModifiedTime)) { + // sibling file is outdated, return invalid md5 + Log.debug("Found outdated sibling file for " + file.toString) + Some("Checksum File Outdated").map(str => Try(str)) } else { + Log.debug("Found no sibling file for " + file.toString) // File present, but no sibling found, fall back to default. None } diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index a91e11ba43a..bbde4840705 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -110,7 +110,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val s3Outputs: Set[AwsBatchFileOutput] = Set(AwsBatchFileOutput("baa", "s3://bucket/somewhere/baa", DefaultPathBuilder.get("baa"), AwsBatchWorkingDisk())) val cpu: Int Refined Positive = 2 - val sharedMemorySize: Int Refined Positive = 64 + val sharedMemorySize: MemorySize = "64 MB" val runtimeAttributes: AwsBatchRuntimeAttributes = new AwsBatchRuntimeAttributes( cpu = cpu,