Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-859 Accept workflow execution identity in config #6967

Merged
merged 5 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript
val useBackendParameters =
configurationDescriptor
.backendConfig
.as[Option[Boolean]](TesConfiguration.useBackendParamtersKey)
.as[Option[Boolean]](TesConfiguration.useBackendParametersKey)
.getOrElse(false)
}

object TesConfiguration {
final val useBackendParamtersKey = "use_tes_11_preview_backend_parameters"
final val useBackendParametersKey = "use_tes_11_preview_backend_parameters"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package cromwell.backend.impl.tes
import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptor, BackendWorkflowDescriptor}
import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptor}
import cromwell.core.logging.JobLogger
import cromwell.core.path.{DefaultPathBuilder, Path}
import net.ceedubs.ficus.Ficus._

import scala.language.postfixOps
import scala.util.Try

import wdl.draft2.model.FullyQualifiedName
import wdl4s.parser.MemoryUnit
import wom.InstantiatedCommand
import wom.callable.Callable.OutputDefinition
import wom.expression.NoIoFunctionSet
import wom.values._

import scala.language.postfixOps
import scala.util.Try

final case class WorkflowExecutionIdentityConfig(value: String) {override def toString: String = value.toString}
final case class WorkflowExecutionIdentityOption(value: String) {override def toString: String = value}
final case class TesTask(jobDescriptor: BackendJobDescriptor,
configurationDescriptor: BackendConfigurationDescriptor,
jobLogger: JobLogger,
Expand All @@ -32,6 +36,16 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
private val workflowDescriptor = jobDescriptor.workflowDescriptor
private val workflowName = workflowDescriptor.callable.name
private val fullyQualifiedTaskName = jobDescriptor.taskCall.fullyQualifiedName
private val workflowExecutionIdentityConfig: Option[WorkflowExecutionIdentityConfig] =
configurationDescriptor.backendConfig
.getAs[String]("workflow-execution-identity")
.map(WorkflowExecutionIdentityConfig)
private val workflowExecutionIdentityOption: Option[WorkflowExecutionIdentityOption] =
workflowDescriptor
.workflowOptions
.get(TesWorkflowOptionKeys.WorkflowExecutionIdentity)
.toOption
.map(WorkflowExecutionIdentityOption)
val name: String = fullyQualifiedTaskName
val description: String = jobDescriptor.toString

Expand Down Expand Up @@ -212,7 +226,15 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
result
}

val resources: Resources = TesTask.makeResources(runtimeAttributes, workflowDescriptor)
val preferedWorkflowExecutionIdentity = TesTask.getPreferredWorkflowExecutionIdentity(
workflowExecutionIdentityConfig,
workflowExecutionIdentityOption
)

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
)

val executors = Seq(Executor(
image = dockerImageUsed,
Expand All @@ -226,21 +248,22 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
}

object TesTask {
// Helper to determine which source to use for a workflowExecutionIdentity
def getPreferredWorkflowExecutionIdentity(configIdentity: Option[WorkflowExecutionIdentityConfig],
workflowOptionsIdentity: Option[WorkflowExecutionIdentityOption]): Option[String] = {
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowDescriptor: BackendWorkflowDescriptor): Resources = {
workflowExecutionId: Option[String]): Resources = {

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
// Note that we validate the type of WorkflowExecutionIdentity
// in TesInitializationActor.
val backendParameters = runtimeAttributes.backendParameters ++
workflowDescriptor
.workflowOptions
.get(TesWorkflowOptionKeys.WorkflowExecutionIdentity)
.toOption
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap

val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Option(x.to(MemoryUnit.GB).amount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package cromwell.backend.impl.tes

import common.assertion.CromwellTimeoutSpec
import common.mock.MockSugar
import cromwell.backend.{BackendSpec, TestConfig}
import cromwell.backend.validation.ContinueOnReturnCodeSet
import cromwell.backend.{BackendSpec, TestConfig}
import cromwell.core.WorkflowOptions
import cromwell.core.labels.Labels
import cromwell.core.logging.JobLogger
import cromwell.core.path.DefaultPathBuilder
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import spray.json.{JsObject, JsString, JsValue}
import spray.json.{JsObject, JsValue}
import wom.InstantiatedCommand

class TesTaskSpec
Expand All @@ -32,32 +32,44 @@ class TesTaskSpec
Map.empty
)

def workflowDescriptorWithIdentity(excIdentity: Option[String]) = {
val optionsMap: Map[String, JsValue] = excIdentity.map(i => TesWorkflowOptionKeys.WorkflowExecutionIdentity -> JsString(i)).toMap
buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld, None, WorkflowOptions(JsObject(optionsMap)))
}

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(Option("abc123"))
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(Option(""))
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wd = workflowDescriptorWithIdentity(None)
TesTask.makeResources(runtimeAttributes, wd) shouldEqual
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
Expand Down