Skip to content

Commit

Permalink
WX-859 Accept workflow execution identity in config (#6967)
Browse files Browse the repository at this point in the history
  • Loading branch information
kraefrei authored Jan 3, 2023
1 parent dce3fe7 commit 60c2c86
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript
val useBackendParameters =
configurationDescriptor
.backendConfig
.as[Option[Boolean]](TesConfiguration.useBackendParamtersKey)
.as[Option[Boolean]](TesConfiguration.useBackendParametersKey)
.getOrElse(false)
}

object TesConfiguration {
final val useBackendParamtersKey = "use_tes_11_preview_backend_parameters"
final val useBackendParametersKey = "use_tes_11_preview_backend_parameters"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package cromwell.backend.impl.tes
import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptor, BackendWorkflowDescriptor}
import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptor}
import cromwell.core.logging.JobLogger
import cromwell.core.path.{DefaultPathBuilder, Path}
import net.ceedubs.ficus.Ficus._

import scala.language.postfixOps
import scala.util.Try

import wdl.draft2.model.FullyQualifiedName
import wdl4s.parser.MemoryUnit
import wom.InstantiatedCommand
import wom.callable.Callable.OutputDefinition
import wom.expression.NoIoFunctionSet
import wom.values._

import scala.language.postfixOps
import scala.util.Try

final case class WorkflowExecutionIdentityConfig(value: String) {override def toString: String = value.toString}
final case class WorkflowExecutionIdentityOption(value: String) {override def toString: String = value}
final case class TesTask(jobDescriptor: BackendJobDescriptor,
configurationDescriptor: BackendConfigurationDescriptor,
jobLogger: JobLogger,
Expand All @@ -32,6 +36,16 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
private val workflowDescriptor = jobDescriptor.workflowDescriptor
private val workflowName = workflowDescriptor.callable.name
private val fullyQualifiedTaskName = jobDescriptor.taskCall.fullyQualifiedName
private val workflowExecutionIdentityConfig: Option[WorkflowExecutionIdentityConfig] =
configurationDescriptor.backendConfig
.getAs[String]("workflow-execution-identity")
.map(WorkflowExecutionIdentityConfig)
private val workflowExecutionIdentityOption: Option[WorkflowExecutionIdentityOption] =
workflowDescriptor
.workflowOptions
.get(TesWorkflowOptionKeys.WorkflowExecutionIdentity)
.toOption
.map(WorkflowExecutionIdentityOption)
val name: String = fullyQualifiedTaskName
val description: String = jobDescriptor.toString

Expand Down Expand Up @@ -212,7 +226,15 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
result
}

val resources: Resources = TesTask.makeResources(runtimeAttributes, workflowDescriptor)
val preferedWorkflowExecutionIdentity = TesTask.getPreferredWorkflowExecutionIdentity(
workflowExecutionIdentityConfig,
workflowExecutionIdentityOption
)

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
)

val executors = Seq(Executor(
image = dockerImageUsed,
Expand All @@ -226,21 +248,22 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
}

object TesTask {
// Helper to determine which source to use for a workflowExecutionIdentity
def getPreferredWorkflowExecutionIdentity(configIdentity: Option[WorkflowExecutionIdentityConfig],
workflowOptionsIdentity: Option[WorkflowExecutionIdentityOption]): Option[String] = {
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowDescriptor: BackendWorkflowDescriptor): Resources = {
workflowExecutionId: Option[String]): Resources = {

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
// Note that we validate the type of WorkflowExecutionIdentity
// in TesInitializationActor.
val backendParameters = runtimeAttributes.backendParameters ++
workflowDescriptor
.workflowOptions
.get(TesWorkflowOptionKeys.WorkflowExecutionIdentity)
.toOption
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap

val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Option(x.to(MemoryUnit.GB).amount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package cromwell.backend.impl.tes

import common.assertion.CromwellTimeoutSpec
import common.mock.MockSugar
import cromwell.backend.{BackendSpec, TestConfig}
import cromwell.backend.validation.ContinueOnReturnCodeSet
import cromwell.backend.{BackendSpec, TestConfig}
import cromwell.core.WorkflowOptions
import cromwell.core.labels.Labels
import cromwell.core.logging.JobLogger
import cromwell.core.path.DefaultPathBuilder
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import spray.json.{JsObject, JsString, JsValue}
import spray.json.{JsObject, JsValue}
import wom.InstantiatedCommand

class TesTaskSpec
Expand All @@ -32,32 +32,44 @@ class TesTaskSpec
Map.empty
)

def workflowDescriptorWithIdentity(excIdentity: Option[String]) = {
val optionsMap: Map[String, JsValue] = excIdentity.map(i => TesWorkflowOptionKeys.WorkflowExecutionIdentity -> JsString(i)).toMap
buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld, None, WorkflowOptions(JsObject(optionsMap)))
}

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(Option("abc123"))
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(Option(""))
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(None)
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
Expand Down

0 comments on commit 60c2c86

Please sign in to comment.