Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/efs caching #50

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ object DefaultIoCommand {
override def commandDescription: String = s"DefaultIoExistsCommand file '$file'"
}

case class DefaultIoExistsOrThrowCommand(override val file: Path) extends IoExistsOrThrowCommand(file) {
override def commandDescription: String = s"DefaultIoExistsOrThrowCommand file '$file'"
}

case class DefaultIoReadLinesCommand(override val file: Path) extends IoReadLinesCommand(file) {
override def commandDescription: String = s"DefaultIoReadLinesCommand file '$file'"
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ abstract class IoExistsCommand(val file: Path) extends SingleFileIoCommand[Boole
override lazy val name = "exist"
}

/**
* Check whether a file exists but throw an exception if it doesn't
*/
abstract class IoExistsOrThrowCommand(val file: Path) extends SingleFileIoCommand[Boolean] {
override def toString = s"Throw error if ${file.pathAsString} does not exist"
override lazy val name = "exist"
}

/**
* Return the lines of a file in a collection
*/
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ abstract class PartialIoCommandBuilder {
def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = PartialFunction.empty
def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty
def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty
def existsOrThrowCommand: PartialFunction[Path, Try[IoExistsOrThrowCommand]] = PartialFunction.empty
def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty
def readLinesCommand: PartialFunction[Path, Try[IoReadLinesCommand]] = PartialFunction.empty
}
Expand Down Expand Up @@ -94,6 +95,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
def existsCommand(file: Path): Try[IoExistsCommand] =
buildOrDefault(_.existsCommand, file, DefaultIoExistsCommand(file))

def existsOrThrowCommand(file: Path): Try[IoExistsOrThrowCommand] =
buildOrDefault(_.existsOrThrowCommand, file, DefaultIoExistsOrThrowCommand(file))

def isDirectoryCommand(file: Path): Try[IoIsDirectoryCommand] =
buildOrDefault(_.isDirectoryCommand, file, DefaultIoIsDirectoryCommand(file))

Expand Down
9 changes: 9 additions & 0 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import net.ceedubs.ficus.readers.ValueReader

import java.io._
import java.nio.charset.StandardCharsets
import java.nio.file.NoSuchFileException
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -78,6 +79,7 @@ class NioFlow(parallelism: Int,
case hashCommand: IoHashCommand => hash(hashCommand) map hashCommand.success
case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success
case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success
case existsOrThrowCommand: IoExistsOrThrowCommand => existsOrThrow(existsOrThrowCommand) map existsOrThrowCommand.success
case readLinesCommand: IoReadLinesCommand => readLines(readLinesCommand) map readLinesCommand.success
case isDirectoryCommand: IoIsDirectoryCommand => isDirectory(isDirectoryCommand) map isDirectoryCommand.success
case _ => IO.raiseError(new UnsupportedOperationException("Method not implemented"))
Expand Down Expand Up @@ -183,6 +185,13 @@ class NioFlow(parallelism: Int,
exists.file.exists
}

private def existsOrThrow(exists: IoExistsOrThrowCommand) = IO {
exists.file.exists match {
case false => throw new NoSuchFileException(exists.file.toString)
case true => true
}
}

private def readLines(exists: IoReadLinesCommand) = IO {
exists.file.withReader { reader =>
LazyList.continually(reader.readLine()).takeWhile(_ != null).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder
override def existsCommand: PartialFunction[Path, Try[S3BatchExistsCommand]] = { case path: S3Path =>
Try(S3BatchExistsCommand(path))
}

override def existsOrThrowCommand: PartialFunction[Path, Try[S3BatchExistsOrThrowCommand]] = { case path: S3Path =>
Try(S3BatchExistsOrThrowCommand(path))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ import cromwell.core.io.{
IoCopyCommand,
IoDeleteCommand,
IoExistsCommand,
IoExistsOrThrowCommand,
IoHashCommand,
IoSizeCommand,
IoTouchCommand
}

import cromwell.filesystems.s3.S3Path
import java.nio.file.NoSuchFileException

/**
* Io commands with S3 paths and some logic enabling batching of request.
Expand Down Expand Up @@ -143,3 +145,20 @@ case class S3BatchExistsCommand(override val file: S3Path)
}
override def commandDescription: String = s"S3BatchExistsCommand file '$file'"
}

/**
* `IoCommand` to determine the existence of an object in S3, but throws execption if the object can't be found
* @param file the path to the object
*/
case class S3BatchExistsOrThrowCommand(override val file: S3Path)
extends IoExistsOrThrowCommand(file)
with S3BatchHeadCommand[Boolean] {
override def mapResponse(response: HeadObjectResponse): Boolean = true
override def onFailure(error: SdkException): Option[Left[Boolean, Nothing]] =
// If the object can't be found, fail the request and throw an exception
error match {
case _: NoSuchKeyException => throw new NoSuchFileException(file.toString)
case _ => None
}
override def commandDescription: String = s"S3BatchExistsCommand file '$file'"
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,18 +427,17 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
// need to make md5sum?
val md5_cmd = if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) {
Log.debug("Add cmd to create MD5 sibling.")
// this does NOT regenerate the md5 in case the file is overwritten !
// TODO : Add check for file age => recreate if md5 older than main file
// generate MD5 if missing or if local file is newer than sibling md5
s"""
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]] ; then
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then
| # the glob list
| md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 );
| # globbed files, using specified number of cpus for parallel processing.
SAVEIFS="$$IFS"
|IFS=$$'\n'
| SAVEIFS="$$IFS"
| IFS=$$'\n'
| cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" | xargs -I% -P${runtimeAttributes.cpu.##.toString} bash -c "md5sum ${globDirectory}/% > ${globDirectory}/%.md5"
| IFS="$$SAVEIFS"
|fi
|IFS="$$SAVEIFS"
|""".stripMargin
}
// return combined result
Expand All @@ -461,7 +460,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
Log.debug("output Data on working disk mount" + output.local.pathAsString)
s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" """.stripMargin

// file(name (full path), s3key (delocalized path), local (file basename), mount (disk details))
// files on EFS mounts are optionally delocalized.
case output: AwsBatchFileOutput if efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get =>
Log.debug("EFS output file detected: "+ output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}")
Expand All @@ -478,9 +476,9 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
// need to make md5sum?
var md5_cmd = ""
if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) {
Log.debug("Add cmd to create MD5 sibling.")
Log.debug("Add cmd to create MD5 sibling if missing or outdated.")
md5_cmd = s"""
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]] ; then
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then
| md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 );
|fi
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import java.security.MessageDigest
import org.apache.commons.lang3.builder.{ToStringBuilder, ToStringStyle}
import org.slf4j.{Logger, LoggerFactory}
import wdl4s.parser.MemoryUnit
import wom.format.MemorySize

/**
* Responsible for the creation of the job definition.
Expand Down Expand Up @@ -164,8 +165,8 @@ trait AwsBatchJobDefinitionBuilder {
efsMakeMD5: Boolean,
tagResources: Boolean,
logGroupName: String,
sharedMemorySize: Int): String = {
s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName"
sharedMemorySize: MemorySize): String = {
s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName:${sharedMemorySize.to(MemoryUnit.MB).amount.toInt}"
}

val environment = List.empty[KeyValuePair]
Expand Down Expand Up @@ -201,9 +202,8 @@ trait AwsBatchJobDefinitionBuilder {
efsMakeMD5,
tagResources,
logGroupName,
context.runtimeAttributes.sharedMemorySize.value
context.runtimeAttributes.sharedMemorySize
)

// To reuse job definition for gpu and gpu-runs, we will create a job definition that does not gpu requirements
// since aws batch does not allow you to set gpu as 0 when you dont need it. you will always need cpu and memory
(ContainerProperties.builder()
Expand All @@ -219,8 +219,8 @@ trait AwsBatchJobDefinitionBuilder {
.environment(environment.asJava)
.ulimits(ulimits.asJava)
.linuxParameters(
LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.##).build()
),
LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.to(MemoryUnit.MB).amount.toInt).build() // Convert MemorySize to MB
),
containerPropsName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import wom.RuntimeAttributesKeys
import wom.format.MemorySize
import wdl4s.parser.MemoryUnit
import wom.types._
import wom.values._
import com.typesafe.config.{ConfigException, ConfigValueFactory}

import scala.util.matching.Regex
import org.slf4j.{Logger, LoggerFactory}
import wom.RuntimeAttributesKeys.{GpuKey, sharedMemoryKey}
import wom.RuntimeAttributesKeys.{GpuKey } // , sharedMemoryKey}

import scala.util.{Failure, Success, Try}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -94,7 +95,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive,
ulimits: Vector[Map[String, String]],
efsDelocalize: Boolean,
efsMakeMD5 : Boolean,
sharedMemorySize: Int Refined Positive,
sharedMemorySize: MemorySize,
logGroupName: String,
additionalTags: Map[String, String],
fileSystem: String= "s3",
Expand All @@ -110,7 +111,7 @@ object AwsBatchRuntimeAttributes {

val awsBatchEvaluateOnExitKey = "awsBatchEvaluateOnExit"

val defaultSharedMemorySize = WomInteger(64)
val defaultSharedMemorySize = MemorySize(64, MemoryUnit.MB)

private val awsBatchEvaluateOnExitDefault = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue])))

Expand Down Expand Up @@ -179,9 +180,10 @@ object AwsBatchRuntimeAttributes {
noAddressValidationInstance
.withDefault(noAddressValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse NoAddressDefaultValue)

private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Refined[Int, Positive]] = {
SharedMemorySizeValidation(sharedMemoryKey).withDefault(
SharedMemorySizeValidation(sharedMemoryKey).configDefaultWomValue(runtimeConfig).getOrElse(defaultSharedMemorySize)
private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = {
MemoryValidation.withDefaultMemory(
RuntimeAttributesKeys.sharedMemoryKey,
MemoryValidation.configDefaultString(RuntimeAttributesKeys.sharedMemoryKey, runtimeConfig) getOrElse defaultSharedMemorySize.toString
)
}

Expand Down Expand Up @@ -337,7 +339,7 @@ object AwsBatchRuntimeAttributes {
val efsDelocalize: Boolean = RuntimeAttributesValidation.extract(awsBatchefsDelocalizeValidation(runtimeAttrsConfig),validatedRuntimeAttributes)
val efsMakeMD5: Boolean = RuntimeAttributesValidation.extract(awsBatchefsMakeMD5Validation(runtimeAttrsConfig),validatedRuntimeAttributes)
val tagResources: Boolean = RuntimeAttributesValidation.extract(awsBatchtagResourcesValidation(runtimeAttrsConfig),validatedRuntimeAttributes)
val sharedMemorySize: Int Refined Positive = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
val sharedMemorySize: MemorySize = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes)

new AwsBatchRuntimeAttributes(
cpu,
Expand Down Expand Up @@ -708,12 +710,6 @@ class AwsBatchtagResourcesValidation(key: String) extends BooleanRuntimeAttribut
override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a Boolean"
}

object SharedMemorySizeValidation {
def apply(key: String): SharedMemorySizeValidation = new SharedMemorySizeValidation(key)
}

class SharedMemorySizeValidation(key: String) extends PositiveIntRuntimeAttributesValidation(key)

object UlimitsValidation
extends RuntimeAttributesValidation[Vector[Map[String, String]]] {
override def key: String = AwsBatchRuntimeAttributes.UlimitsKey
Expand Down
Loading
Loading