Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared memory, fix relative path in glob, fix UT #45

Draft
wants to merge 7 commits into
base: develop_aws
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ object CommonBackendConfigurationAttributes {
"default-runtime-attributes.awsBatchRetryAttempts",
"default-runtime-attributes.maxRetries",
"default-runtime-attributes.awsBatchEvaluateOnExit",
"default-runtime-attributes.sharedMemorySize",
"default-runtime-attributes.ulimits",
"default-runtime-attributes.efsDelocalize",
"default-runtime-attributes.efsMakeMD5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpres
import wom.expression.WomExpression
import wom.graph.LocalName
import wom.values._
import wom.types._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}
import java.net.URLDecoder
import scala.concurrent._
Expand Down Expand Up @@ -171,7 +170,7 @@ trait StandardAsyncExecutionActor
// decode url encoded values generated by the mapper
val decoded_womValue = URLDecoder.decode(mapped_wom.valueString,"UTF-8")
// convert to womfile again
WomFile(WomSingleFileType, decoded_womValue)
WomFile(womFile.womFileType, decoded_womValue)
}
)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL

case input: AwsBatchFileInput if input.s3key.startsWith("s3://") =>
// regular s3 objects : download to working dir.
s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}" """.stripMargin
s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}"""".stripMargin
.replace(AwsBatchWorkingDisk.MountPoint.pathAsString, workDir)

case input: AwsBatchFileInput if efsMntPoint.isDefined && input.s3key.startsWith(efsMntPoint.get) =>
Expand Down Expand Up @@ -170,6 +170,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL

val workflowId = invalidCharsPattern.replaceAllIn(jobDescriptor.workflowDescriptor.rootWorkflowId.toString,"_")
val workflowName = invalidCharsPattern.replaceAllIn(jobDescriptor.workflowDescriptor.rootWorkflow.name.toString,"_")

val taskId = invalidCharsPattern.replaceAllIn(jobDescriptor.key.call.fullyQualifiedName + "-" + jobDescriptor.key.index + "-" + jobDescriptor.key.attempt,"_")
val doTagging = tagResources.getOrElse(false) // development : always tag resources.
//val tags: Map[String,String] = Map("cromwell-workflow-name" -> workflowName, "cromwell-workflow-id" -> workflowId, "cromwell-task-id" -> taskId)
Expand Down Expand Up @@ -202,7 +203,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" ||
| { echo "attempt $$i to copy $$s3_path failed" && sleep $$((7 * "$$i")) && continue; }
| # check data integrity
| _check_data_integrity "$$destination" "$$s3_path" ||
| _check_data_integrity "$$destination" "$$s3_path" ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| # copy succeeded
| break
Expand Down Expand Up @@ -239,7 +240,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| break
| fi
| # if destination is not a bucket : abort
| if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then
| if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then
| echo "$$destination is not an S3 path with a bucket and key."
| DELOCALIZATION_FAILED=1
| break
Expand All @@ -249,21 +250,21 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| # make sure to strip the trailing / in destination
| destination=$${destination%/}
| # glob directory. do recursive copy
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" --recursive --exclude "cromwell_glob_control_file" ||
| { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" --recursive --exclude "cromwell_glob_control_file" ||
| { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| # check integrity for each of the files (allow spaces)
| SAVEIFS="$$IFS"
| IFS=$$'\n'
| for FILE in $$(cd "$$local_path" ; ls | grep -v cromwell_glob_control_file); do
| _check_data_integrity "$$local_path/$$FILE" "$$destination/$$FILE" ||
| _check_data_integrity "$$local_path/$$FILE" "$$destination/$$FILE" ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; }
| done
| IFS="$$SAVEIFS"
| else
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" ||
| { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| else
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" ||
| { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| # check content length for data integrity
| _check_data_integrity "$$local_path" "$$destination" ||
| _check_data_integrity "$$local_path" "$$destination" ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| fi
| # copy succeeded
Expand All @@ -274,7 +275,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|function _get_multipart_chunk_size() {
| local file_path="$$1"
| # file size
| file_size=$$(stat --printf="%s" "$$file_path")
| file_size=$$(stat --printf="%s" "$$file_path")
| # chunk_size : you can have at most 10K parts with at least one 5MB part
| # this reflects the formula in s3-copy commands of cromwell (S3FileSystemProvider.java)
| # => long partSize = Math.max((objectSize / 10000L) + 1, 5 * 1024 * 1024);
Expand All @@ -287,9 +288,9 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|function _check_data_integrity() {
| local local_path="$$1"
| local s3_path="$$2"
|
|
| # remote : use content_length
| if [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then
| if [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then
| bucket="$${BASH_REMATCH[1]}"
| key="$${BASH_REMATCH[2]}"
| else
Expand Down Expand Up @@ -356,7 +357,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|if [[ "${doTagging}" == "true" ]]; then
| echo "*** TAGGING RESOURCES ***"
| _add_tags
|fi
|fi
|
|echo '*** LOCALIZING INPUTS ***'
|if [ ! -d $workDir ]; then mkdir $workDir && chmod 777 $workDir; fi
Expand All @@ -367,7 +368,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|if [[ $$LOCALIZATION_FAILED -eq 1 ]]; then
| echo '*** LOCALIZATION FAILED ***'
| exit 1
|else
|else
| echo '*** COMPLETED LOCALIZATION ***'
|fi
|set +e
Expand Down Expand Up @@ -451,7 +452,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString =>
//output is on working disk mount
Log.debug("output Data on working disk mount" + output.local.pathAsString)
s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" """.stripMargin
s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}"""".stripMargin

// file(name (full path), s3key (delocalized path), local (file basename), mount (disk details))
// files on EFS mounts are optionally delocalized.
Expand Down Expand Up @@ -481,6 +482,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
}
// return combined result
s"""
| # changes made
|${test_cmd}
|${md5_cmd}
| """.stripMargin
Expand All @@ -489,10 +491,11 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
//output on a different mount
Log.debug("output data on other mount")
Log.debug(output.toString)
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" """.stripMargin
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}"""".stripMargin
case _ => ""
}.mkString("\n") + "\n" +
s"""
| # there hsould be some change here
|if [ -f "$workDir/${jobPaths.returnCodeFilename}" ]; then _s3_delocalize_with_retry "$workDir/${jobPaths.returnCodeFilename}" "${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename}" ; fi
|if [ -f "$stdErr" ]; then _s3_delocalize_with_retry "$stdErr" "${jobPaths.standardPaths.error.pathAsString}"; fi
|if [ -f "$stdOut" ]; then _s3_delocalize_with_retry "$stdOut" "${jobPaths.standardPaths.output.pathAsString}"; fi
Expand All @@ -508,7 +511,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|if [[ "${doTagging}" == "true" ]]; then
| echo "*** TAGGING RESOURCES ***"
| _add_tags
|fi
|fi
|
|echo '*** DELOCALIZING OUTPUTS ***'
|DELOCALIZATION_FAILED=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ package cromwell.backend.impl.aws
import scala.collection.mutable.ListBuffer
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.io.JobPaths
import software.amazon.awssdk.services.batch.model.{ContainerProperties, EvaluateOnExit, Host, KeyValuePair, MountPoint, ResourceRequirement, ResourceType, RetryAction, RetryStrategy, Ulimit, Volume}
import software.amazon.awssdk.services.batch.model.{ContainerProperties, EvaluateOnExit, Host, KeyValuePair, LinuxParameters, MountPoint, ResourceRequirement, ResourceType, RetryAction, RetryStrategy, Ulimit, Volume}
import cromwell.backend.impl.aws.io.AwsBatchVolume

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -153,8 +153,8 @@ trait AwsBatchJobDefinitionBuilder {
).toList
}

def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit], efsDelocalize: Boolean, efsMakeMD5: Boolean, tagResources: Boolean): String = {
s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}"
def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit], efsDelocalize: Boolean, efsMakeMD5: Boolean, tagResources: Boolean, sharedMemorySize: Int): String = {
s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:${sharedMemorySize.toString}"
}

val environment = List.empty[KeyValuePair]
Expand All @@ -179,7 +179,8 @@ trait AwsBatchJobDefinitionBuilder {
ulimits,
efsDelocalize,
efsMakeMD5,
tagResources
tagResources,
context.runtimeAttributes.sharedMemorySize.value
)

// To reuse job definition for gpu and gpu-runs, we will create a job definition that does not gpu requirements
Expand All @@ -194,8 +195,12 @@ trait AwsBatchJobDefinitionBuilder {
.volumes(volumes.asJava)
.mountPoints(mountPoints.asJava)
.environment(environment.asJava)
.ulimits(ulimits.asJava),
.ulimits(ulimits.asJava)
.linuxParameters(
LinuxParameters.builder().sharedMemorySize(context.runtimeAttributes.sharedMemorySize.##).build()
),
containerPropsName)

}

def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import com.typesafe.config.{ConfigException, ConfigValueFactory}

import scala.util.matching.Regex
import org.slf4j.{Logger, LoggerFactory}
import wom.RuntimeAttributesKeys.GpuKey
import wom.RuntimeAttributesKeys.{GpuKey, sharedMemoryKey}

import scala.util.{Failure, Success, Try}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -91,8 +91,10 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive,
ulimits: Vector[Map[String, String]],
efsDelocalize: Boolean,
efsMakeMD5 : Boolean,
fileSystem: String= "s3",
tagResources: Boolean = false)
sharedMemorySize: Int Refined Positive,
fileSystem: String = "s3",
tagResources: Boolean = false
)

object AwsBatchRuntimeAttributes {
val Log: Logger = LoggerFactory.getLogger(this.getClass)
Expand All @@ -103,6 +105,8 @@ object AwsBatchRuntimeAttributes {
val awsBatchRetryAttemptsKey = "awsBatchRetryAttempts"

val awsBatchEvaluateOnExitKey = "awsBatchEvaluateOnExit"

val defaultSharedMemorySize = WomInteger(64)
private val awsBatchEvaluateOnExitDefault = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue])))


Expand Down Expand Up @@ -151,6 +155,12 @@ object AwsBatchRuntimeAttributes {
MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue)
}

private def sharedMemorySizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Refined[Int, Positive]] = {
SharedMemorySizeValidation(sharedMemoryKey).withDefault(
SharedMemorySizeValidation(sharedMemoryKey).configDefaultWomValue(runtimeConfig).getOrElse(defaultSharedMemorySize)
)
}

private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = {
MemoryValidation.withDefaultMemory(
RuntimeAttributesKeys.MemoryMinKey,
Expand Down Expand Up @@ -243,7 +253,8 @@ object AwsBatchRuntimeAttributes {
ulimitsValidation(runtimeConfig),
awsBatchefsDelocalizeValidation(runtimeConfig),
awsBatchefsMakeMD5Validation(runtimeConfig),
awsBatchtagResourcesValidation(runtimeConfig)
awsBatchtagResourcesValidation(runtimeConfig),
sharedMemorySizeValidation(runtimeConfig),
)
def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation(
cpuValidation(runtimeConfig),
Expand All @@ -261,8 +272,9 @@ object AwsBatchRuntimeAttributes {
ulimitsValidation(runtimeConfig),
awsBatchefsDelocalizeValidation(runtimeConfig),
awsBatchefsMakeMD5Validation(runtimeConfig),
awsBatchtagResourcesValidation(runtimeConfig)
)
awsBatchtagResourcesValidation(runtimeConfig),
sharedMemorySizeValidation(runtimeConfig),
)

configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => validationsS3backend
Expand Down Expand Up @@ -293,6 +305,7 @@ object AwsBatchRuntimeAttributes {
val efsDelocalize: Boolean = RuntimeAttributesValidation.extract(awsBatchefsDelocalizeValidation(runtimeAttrsConfig),validatedRuntimeAttributes)
val efsMakeMD5: Boolean = RuntimeAttributesValidation.extract(awsBatchefsMakeMD5Validation(runtimeAttrsConfig),validatedRuntimeAttributes)
val tagResources: Boolean = RuntimeAttributesValidation.extract(awsBatchtagResourcesValidation(runtimeAttrsConfig),validatedRuntimeAttributes)
val sharedMemorySize: Int Refined Positive = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
new AwsBatchRuntimeAttributes(
cpu,
gpuCount,
Expand All @@ -310,8 +323,9 @@ object AwsBatchRuntimeAttributes {
ulimits,
efsDelocalize,
efsMakeMD5,
sharedMemorySize,
fileSystem,
tagResources
tagResources,
)
}
}
Expand Down Expand Up @@ -664,6 +678,12 @@ class AwsBatchtagResourcesValidation(key: String) extends BooleanRuntimeAttribut
override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a Boolean"
}

object SharedMemorySizeValidation {
def apply(key: String): SharedMemorySizeValidation = new SharedMemorySizeValidation(key)
}

class SharedMemorySizeValidation(key: String) extends PositiveIntRuntimeAttributesValidation(key)

object UlimitsValidation
extends RuntimeAttributesValidation[Vector[Map[String, String]]] {
override def key: String = AwsBatchRuntimeAttributes.UlimitsKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ package cromwell.backend.impl.aws.io
import wom.values._
import cromwell.backend.io._
import cromwell.backend.standard._

import scala.concurrent.Future
import java.nio.file.Paths
import java.nio.file.{Path, Paths}
import cromwell.core.path.DefaultPathBuilder


Expand All @@ -56,7 +57,7 @@ trait AwsBatchGlobFunctions extends GlobFunctions {
override def glob(pattern: String): Future[Seq[String]] = {
// get access to globName()
import GlobFunctions._

// GOAL :
// - get config (backend / runtime / ...) here to evaluate if efsMntPoint is set & if efs delocalization is set.
// - according to those values : write the pattern as s3:// or as local path.
Expand All @@ -67,22 +68,25 @@ trait AwsBatchGlobFunctions extends GlobFunctions {
val wfid_regex = ".{8}-.{4}-.{4}-.{4}-.{12}".r
val wfid = callContext.root.toString.split("/").toList.filter(element => wfid_regex.pattern.matcher(element).matches()).lastOption.getOrElse("")
val globPatternName = globName(s"${pattern}-${wfid}")
val globbedDir = Paths.get(pattern).getParent.toString
val globbedDir = Paths.get(pattern).getParent match {
// remove "./" to avoid it from appearing in s3 path
case x: Path => x.toString.stripPrefix("./").stripPrefix(".")
case _ => ""
}
val listFilePath = if (pattern.startsWith("/mnt/efs/")) {
DefaultPathBuilder.get(globbedDir + "/." + globPatternName + ".list")
DefaultPathBuilder.get(globbedDir + "/." + globPatternName + ".list")
} else {
callContext.root.resolve(s"${globbedDir}/.${globPatternName}.list".stripPrefix("/"))
callContext.root.resolve(s".${globPatternName}.list".stripPrefix("/"))
}
asyncIo.readLinesAsync(listFilePath.toRealPath()) map { lines =>
lines.toList map { fileName =>
// again : this should be config based...
if (pattern.startsWith("/mnt/efs/")) {
s"${globbedDir}/.${globPatternName}/${fileName}"
s"${globbedDir}/.${globPatternName}/${fileName}".stripPrefix("/")
} else {
callContext.root.resolve(s"${globbedDir}/.${globPatternName}/${fileName}".stripPrefix("/")).pathAsString
callContext.root.resolve(s".${globPatternName}/${fileName}".stripPrefix("/")).pathAsString.stripPrefix("/")
}
}
}
}

}
Loading
Loading