Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1156] internal_path_prefix for TES 4.4 #7190

Merged
merged 15 commits into from
Aug 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
val callInputsDockerRoot = callDockerRoot.resolve("inputs")
val callInputsRoot = callRoot.resolve("inputs")

/*
* tesTaskRoot: This is the root directory that TES will use for files related to this task.
* We provide it to TES as a k/v pair where the key is "internal_path_prefix" (specified in TesWorkflowOptionKeys.scala)
* and the value is a blob path.
* This is not a standard TES feature, but rather related to the Azure TES implementation that Terra uses.
* While passing it outside of terra won't do any harm, we could consider making this optional and/or configurable.
*/
val tesTaskRoot : Path = callExecutionRoot.resolve("tes_task")

Check warning on line 38 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala#L38

Added line #L38 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does resolve throw an exception if it's not running on Azure TES? Or is it creating-if-not-exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It requires callExecutionRoot to at least exist, but it will create .../tes_task if that doesn't yet exist.

callExecutionRoot needs to exist in order to not throw, but that should, at the minimum, be an empty path at this point.


// Given an output path, return a path localized to the storage file system
def storageOutput(path: String): String = {
callExecutionRoot.resolve(path).toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package cromwell.backend.impl.tes

import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
Expand Down Expand Up @@ -71,7 +70,6 @@
path = tesPaths.callExecutionDockerRoot.resolve("script").toString,
`type` = Option("FILE")
)

private def writeFunctionFiles: Map[FullyQualifiedName, Seq[WomFile]] =
instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f.file) } toMap

Expand Down Expand Up @@ -231,11 +229,6 @@
workflowExecutionIdentityOption
)

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
)

val executors = Seq(Executor(
image = dockerImageUsed,
command = Seq(jobShell, commandScript.path),
Expand All @@ -245,6 +238,12 @@
stdin = None,
env = None
))

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity,
Option(tesPaths.tesTaskRoot.pathAsString)

Check warning on line 245 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala#L242-L245

Added lines #L242 - L245 were not covered by tests
)
}

object TesTask {
Expand All @@ -254,15 +253,21 @@
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowExecutionId: Option[String]): Resources = {

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
// Note that we validate the type of WorkflowExecutionIdentity
// in TesInitializationActor.
val backendParameters = runtimeAttributes.backendParameters ++
workflowExecutionId: Option[String], internalPathPrefix: Option[String]): Resources = {
/*
* workflowExecutionId: This was added in BT-409 to let us pass information to an Azure
* TES server about which user identity to run tasks as.
* Note that we validate the type of WorkflowExecutionIdentity in TesInitializationActor.
*
* internalPathPrefix: Added in WX-1156 to support the azure TES implementation. Specifies
* a working directory that the TES task can use.
*/
val backendParameters : Map[String, Option[String]] = runtimeAttributes.backendParameters ++
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap ++

Check warning on line 268 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala#L268

Added line #L268 was not covered by tests
internalPathPrefix
.map(TesWorkflowOptionKeys.InternalPathPrefix -> Option(_))

Check warning on line 270 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala#L270

Added line #L270 was not covered by tests
.toMap
val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ object TesWorkflowOptionKeys {
// Communicates to the TES server which identity the tasks should execute as
val WorkflowExecutionIdentity = "workflow_execution_identity"
val DataAccessIdentity = "data_access_identity"
val InternalPathPrefix = "internal_path_prefix"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we don't set this in workflow options, I don't think this key should be defined in this file. Maybe just move to TesTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops....I did not connect workflowoptions.json and TesWorkflowOptionKeys.scala until....just now.

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,87 @@ class TesTaskSpec
false,
Map.empty
)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
val internalPathPrefix = Option("mock/path/to/tes/task")
val expectedTuple = "internal_path_prefix" -> internalPathPrefix

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(expectedTuple)))
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456"),
expectedTuple))
)
}

it should "correctly set the internal path prefix when provided as a backend parameter" in {
val wei = Option("abc123")
val internalPathPrefix = Option("mock/path/to/tes/task")
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
"internal_path_prefix" -> internalPathPrefix)
))
}

it should "correctly resolve the path to .../tes_task and add the k/v pair to backend parameters" in {
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld,
labels = Labels("foo" -> "bar"))
val jobDescriptor = jobDescriptorFromSingleCallWorkflow(workflowDescriptor,
Map.empty,
emptyWorkflowOptions,
Set.empty)
val tesPaths = TesJobPaths(jobDescriptor.key,
jobDescriptor.workflowDescriptor,
TestConfig.emptyConfig)

val expectedKey = "internal_path_prefix"
val expectedValue = Option(tesPaths.tesTaskRoot.pathAsString)

//Assert path correctly ends up in the resources
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei, expectedValue) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedKey -> expectedValue))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
Expand Down
Loading