Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-769 disks compatibility for TES backend #6991

Merged
merged 6 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ This release removes the `cwl` top-level artifact. Some nonfunctional references

For more information, see the [Cromwell 79 release notes](https://github.com/broadinstitute/cromwell/releases/tag/79).

### TES backend supports `disks` attribute
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thousand points for remembering to update the changelog!


Cromwell now attempts to translate `disks` attributes [written for GCP](https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks) into valid `disk` attributes for TES.

For information on supported conversions, refer to the [TES documentation](https://cromwell.readthedocs.io/en/stable/backends/TES/).

## 84 Release Notes

### CromIAM enabled user checks
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ lazy val tesBackend = (project in backendRoot / "tes")
.dependsOn(ftpFileSystem)
.dependsOn(drsFileSystem)
.dependsOn(azureBlobFileSystem)
// TES backend provides a compatibility layer to run WDLs with PAPI runtime attributes [WX-769]
.dependsOn(googlePipelinesCommon)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we discussed moving this disk-parsing stuff out to a common library, and also that you were running into some build issues - did that end up being harder than expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad choices were:

  1. Duplicate Scala code
  2. Import "common" but actually GCP-specific code from the existing project
  3. Pull apart Scala code and put "pure" non-GCP parts in a new common library

At standup I referred to issues with (1), once I got (2) to work I was satisfied. (3) is possible but expands scope quite a bit.

.dependsOn(backend % "test->test")
.dependsOn(common % "test->test")

Expand Down
45 changes: 33 additions & 12 deletions docs/RuntimeAttributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,29 @@ workflow jes_workflow {

Cromwell recognizes certain runtime attributes and has the ability to format these for some [Backends](/backends/Backends). See the table below for common attributes that apply to _most_ backends.

| Runtime Attribute | LOCAL | Google Cloud | AWS Batch | HPC |
| -------------------- |:-----:|:-----:|:-----:|:------:|
| [cpu](#cpu) | | x | x | `cpu` |
| [memory](#memory) | | x | x | `memory_mb` / `memory_gb` |
| [disks](#disks) | | x | | * |
| [docker](#docker) | x | x | x | `docker` (see below) |
| [maxRetries](#maxretries) | x | x | x | * |
| [continueOnReturnCode](#continueonreturncode) | x | x | x | * |
| [failOnStderr](#failonstderr) | x | x | x | * |


> `*` The HPC [Shared Filesystem backend](/backends/HPC#shared-filesystem) (SFS) is fully configurable and any number of attributes can be exposed. Cromwell recognizes some of these attributes (`cpu`, `memory` and `docker`) and parses them into the attribute listed in the table which can be used within the HPC backend configuration.
| Runtime Attribute | Local | Google Cloud | TES | AWS Batch | HPC |
|-------------------------------------------------|:-----:|:------------:|-----------|:---------:|:-------------------------:|
| [`cpu`](#cpu) | | ✅ | | ✅ | `cpu` |
| [`memory`](#memory) | | ✅ | | ✅ | `memory_mb` / `memory_gb` |
| [`disks`](#disks) | | ✅ | ⚠️ Note 1 | ⚠️ Note 2 | ℹ️ Note 3 |
| [`disk`](#disk) | | | ✅ | | |
| [`docker`](#docker) | ✅ | ✅ | | ✅ | `docker` ℹ️ Note 3 |
| [`maxRetries`](#maxretries) | ✅ | ✅ | | ✅ | ℹ️ Note 3 |
| [`continueOnReturnCode`](#continueonreturncode) | ✅ | ✅ | | ✅ | ℹ️ Note 3 |
| [`failOnStderr`](#failonstderr) | ✅ | ✅ | | ✅ | ℹ️ Note 3 |


> **Note 1**
>
> Partial support. See [TES documentation](/backends/TES) for details.

> **Note 2**
>
> Partial support. See [`disks`](#disks) for details.

> **Note 3**
>
> The HPC [Shared Filesystem backend](/backends/HPC#shared-filesystem) (SFS) is fully configurable and any number of attributes can be exposed. Cromwell recognizes some of these attributes (`cpu`, `memory` and `docker`) and parses them into the attribute listed in the table which can be used within the HPC backend configuration.


### Google Cloud Specific Attributes
Expand Down Expand Up @@ -206,6 +217,16 @@ runtime {
}
```

### `disk`

Specific to the TES backend, sets the `disk_gb` resource.

```
runtime {
disk: "25 GB"
}
```

### `docker`

When specified, Cromwell will run your task within the specified Docker image.
Expand Down
27 changes: 20 additions & 7 deletions docs/backends/TES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
**TES Backend**
## TES Backend

The TES backend submits jobs to a server that complies with the protocol described by the [GA4GH schema](https://github.com/ga4gh/task-execution-schemas).

Expand All @@ -19,7 +19,7 @@ echo $? > rc

`<container_call_root>` would be equal to the runtime attribute `dockerWorkingDir` or `/cromwell-executions/<workflow_uuid>/call-<call_name>/execution` if this attribute is not supplied.

**Configuring**
### Configuring

Configuring the TES backend is straightforward; one must only provide the TES API endpoint for the service.

Expand All @@ -40,18 +40,18 @@ backend {
}
```

**Supported File Systems**
### Supported File Systems

Currently this backend only works with files on a Local or Shared File System.

**Docker**
### Docker

This backend supports the following optional [Runtime Attributes](../RuntimeAttributes) and [Workflow Options](../wf_options/Overview/) for working with Docker:

* `docker`: Docker image to use such as "Ubuntu".
* `dockerWorkingDir`: defines the working directory in the container.

**CPU, Memory and Disk**
### CPU, Memory and Disk

This backend supports CPU, memory and disk size configuration through the use of the following [Runtime Attributes](../RuntimeAttributes) and [Workflow Options](../wf_options/Overview/):

Expand All @@ -61,16 +61,29 @@ This backend supports CPU, memory and disk size configuration through the use of
* Type: String (ex: "4 GB" or "4096 MB")
* `disk` defines the amount of disk to use.
* Type: String (ex: "1 GB" or "1024 MB")
* `disks` accepts a GCP-style disk declaration and attempts to translate it for use on TES
* See table below for supported translations
* `preemptible` defines whether or not to use preemptible VMs.
* Type: Boolean (ex: "true" or "false")
* Integers are accepted and will be converted to boolean (true if > 0)

If they are not set, the TES backend may use default values.

**Azure**
#### GCP `disks` to TES `disk` compatibility

| GCP `disks` value | Supported | TES translation | Remark |
|---------------------------------------|-----------|-----------------|-----------------------------------|
| `local-disk 25 HDD` | ✅ | 25 GB disk | |
| `local-disk 25 SSD` | ✅ | 25 GB disk | Disk type info is dropped |
| `/some/mnt 25 SSD` | ❌ | | Custom mount points not supported |
| `local-disk 25 HDD, /some/mnt 50 SSD` | ❌ | | Multiple disks are not supported |

> Note: if both `disk` and `disks` attributes are specified, the TES backend will automatically use the value in `disk` and not attempt to translate `disks`.

### Azure
[Azure](Azure) is an implementation of Cromwell that uses the TES interface for orchestrating the tasks on Azure.

**TESK**
### TESK

[TESK](https://github.com/EMBL-EBI-TSI/TESK) is an implementation of the TES interface that uses Kubernetes and FTP.
When running Cromwell with a TESK backend, you will want to customize the way Cromwell process globs, as kubernetes will not work well with hard links in a lot of cases which is the default behavior in Cromwell.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package cromwell.backend.impl.tes
import cats.syntax.validated._
import com.typesafe.config.Config
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.google.pipelines.common.DisksValidation
import cromwell.backend.google.pipelines.common.io.{PipelinesApiAttachedDisk, PipelinesApiEmptyMountedDisk, PipelinesApiWorkingDisk}
import cromwell.backend.standard.StandardValidatedRuntimeAttributesBuilder
import cromwell.backend.validation._
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import wdl4s.parser.MemoryUnit
import wom.RuntimeAttributesKeys
import wom.format.MemorySize
import wom.types.{WomIntegerType, WomStringType}
Expand Down Expand Up @@ -36,6 +39,9 @@ object TesRuntimeAttributes {

private def diskSizeValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[MemorySize] = MemoryValidation.optional(DiskSizeKey)

private def diskSizeCompatValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Seq[PipelinesApiAttachedDisk]] =
DisksValidation.optional

private def memoryValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[MemorySize] = MemoryValidation.optional(RuntimeAttributesKeys.MemoryKey)

private val dockerValidation: RuntimeAttributesValidation[String] = DockerValidation.instance
Expand All @@ -45,10 +51,14 @@ object TesRuntimeAttributes {
private def preemptibleValidation(runtimeConfig: Option[Config]) = PreemptibleValidation.default(runtimeConfig)

def runtimeAttributesBuilder(backendRuntimeConfig: Option[Config]): StandardValidatedRuntimeAttributesBuilder =
// !! NOTE !! If new validated attributes are added to TesRuntimeAttributes, be sure to include
// their validations here so that they will be handled correctly with backendParameters.
// Location 2 of 2
StandardValidatedRuntimeAttributesBuilder.default(backendRuntimeConfig).withValidation(
cpuValidation(backendRuntimeConfig),
memoryValidation(backendRuntimeConfig),
diskSizeValidation(backendRuntimeConfig),
diskSizeCompatValidation(backendRuntimeConfig),
dockerValidation,
dockerWorkingDirValidation,
preemptibleValidation(backendRuntimeConfig),
Expand All @@ -71,13 +81,45 @@ object TesRuntimeAttributes {
Map.empty
}

private def detectDiskFormat(backendRuntimeConfig: Option[Config], validatedRuntimeAttributes: ValidatedRuntimeAttributes): Option[MemorySize] = {

def adaptPapiDisks(disks: Seq[PipelinesApiAttachedDisk]): MemorySize = {
disks match {
case disk :: Nil if disk.isInstanceOf[PipelinesApiWorkingDisk] =>
MemorySize(disk.sizeGb.toDouble, MemoryUnit.GB)
case _ :: _ =>
// When a user specifies only a custom disk, we add the default disk in the background, so we technically have multiple disks.
// But we don't want to confuse the user with `multiple disks` message when they only put one.
if (disks.exists(_.isInstanceOf[PipelinesApiEmptyMountedDisk]))
throw new IllegalArgumentException("Disks with custom mount points are not supported by this backend")
else
// Multiple `local-disk` is not legal, but possible and should be detected
throw new IllegalArgumentException("Expecting exactly one disk definition on this backend, found multiple")
}
}

val maybeTesDisk: Option[MemorySize] =
RuntimeAttributesValidation.extractOption(diskSizeValidation(backendRuntimeConfig).key, validatedRuntimeAttributes)
val maybePapiDisk: Option[Seq[PipelinesApiAttachedDisk]] =
RuntimeAttributesValidation.extractOption(diskSizeCompatValidation(backendRuntimeConfig).key, validatedRuntimeAttributes)

(maybeTesDisk, maybePapiDisk) match {
case (Some(tesDisk: MemorySize), _) =>
Option(tesDisk) // If WDLs are in circulation with both `disk` and `disks`, pick the one intended for this backend
case (None, Some(papiDisks: Seq[PipelinesApiAttachedDisk])) =>
Option(adaptPapiDisks(papiDisks))
case _ =>
None
}
}

def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, rawRuntimeAttributes: Map[String, WomValue], config: TesConfiguration): TesRuntimeAttributes = {
val backendRuntimeConfig = config.runtimeConfig
val docker: String = RuntimeAttributesValidation.extract(dockerValidation, validatedRuntimeAttributes)
val dockerWorkingDir: Option[String] = RuntimeAttributesValidation.extractOption(dockerWorkingDirValidation.key, validatedRuntimeAttributes)
val cpu: Option[Int Refined Positive] = RuntimeAttributesValidation.extractOption(cpuValidation(backendRuntimeConfig).key, validatedRuntimeAttributes)
val memory: Option[MemorySize] = RuntimeAttributesValidation.extractOption(memoryValidation(backendRuntimeConfig).key, validatedRuntimeAttributes)
val disk: Option[MemorySize] = RuntimeAttributesValidation.extractOption(diskSizeValidation(backendRuntimeConfig).key, validatedRuntimeAttributes)
val disk: Option[MemorySize] = detectDiskFormat(backendRuntimeConfig, validatedRuntimeAttributes)
val failOnStderr: Boolean =
RuntimeAttributesValidation.extract(failOnStderrValidation(backendRuntimeConfig), validatedRuntimeAttributes)
val continueOnReturnCode: ContinueOnReturnCode =
Expand All @@ -87,12 +129,14 @@ object TesRuntimeAttributes {

// !! NOTE !! If new validated attributes are added to TesRuntimeAttributes, be sure to include
// their validations here so that they will be handled correctly with backendParameters.
// Location 1 of 2
val validations = Set(
dockerValidation,
dockerWorkingDirValidation,
cpuValidation(backendRuntimeConfig),
memoryValidation(backendRuntimeConfig),
diskSizeValidation(backendRuntimeConfig),
diskSizeCompatValidation(backendRuntimeConfig),
failOnStderrValidation(backendRuntimeConfig),
continueOnReturnCodeValidation(backendRuntimeConfig),
preemptibleValidation(backendRuntimeConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,54 @@ class TesRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeoutSpec
assertFailure(runtimeAttributes, "Expecting disk runtime attribute to be an Integer or String with format '8 GB'")
}

"parse an HDD definition" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("local-disk 10 HDD"))
val expectedRuntimeAttributes = expectedDefaults.copy(disk = Option(MemorySize.parse("10 GB").get))
assertSuccess(runtimeAttributes, expectedRuntimeAttributes)
}

"parse an SSD definition" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("local-disk 10 SSD"))
val expectedRuntimeAttributes = expectedDefaults.copy(disk = Option(MemorySize.parse("10 GB").get))
assertSuccess(runtimeAttributes, expectedRuntimeAttributes)
}

"refuse multiple `local-disk` instances" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("local-disk 10 SSD, local-disk 20 SSD"))
assertFailure(runtimeAttributes, "Expecting exactly one disk definition on this backend, found multiple")
}

"refuse custom mount points" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("/some/mnt 20 SSD"))
assertFailure(runtimeAttributes, "Disks with custom mount points are not supported by this backend")
}

"refuse custom AND multiple mount points" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("/mnt/tmp 10 LOCAL, local-disk 20 HDD"))
assertFailure(runtimeAttributes, "Disks with custom mount points are not supported by this backend")
}

"not accept a single comma" ignore {
// Surprisingly, the PAPI code we call under the covers validates `,` and give the user a default disk.
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString(","))
assertFailure(runtimeAttributes, "Disk strings should be of the format 'local-disk SIZE TYPE' or '/mount/point SIZE TYPE' but got: ','")
}

"not accept empty string" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString(""))
assertFailure(runtimeAttributes, "Disk strings should be of the format 'local-disk SIZE TYPE' or '/mount/point SIZE TYPE' but got: ''")
}

"not accept `banana`" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomString("banana"))
assertFailure(runtimeAttributes, "Disk strings should be of the format 'local-disk SIZE TYPE' or '/mount/point SIZE TYPE' but got: 'banana'")
}

"not accept a random number (chosen by fair dice roll)" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "disks" -> WomInteger(4))
assertFailure(runtimeAttributes, "Expecting disks runtime attribute to be a comma separated String or Array[String]")
}

"validate a valid dockerWorkingDir entry" in {
val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "dockerWorkingDir" -> WomString("/tmp"))
val expectedRuntimeAttributes = expectedDefaults.copy(dockerWorkingDir = Option("/tmp"))
Expand Down