Skip to content

Commit

Permalink
Single task execution (admin-generated workflow implementation) (fly…
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored May 21, 2020
1 parent 026c40f commit ef390a9
Show file tree
Hide file tree
Showing 18 changed files with 1,722 additions and 66 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.17.27
github.com/lyft/flyteidl v0.17.30
github.com/lyft/flytepropeller v0.2.13
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0=
github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.27 h1:0EdSHauzdPEYmubYib/XC6fLb+srzP4yDRN1P9o4W/I=
github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.28-0.20200427212215-1d2c77c864b2 h1:+VDdSC4HMWBhlrjMcAS+1kGp+bK1e5OJR81tjzYJS7M=
github.com/lyft/flyteidl v0.17.28-0.20200427212215-1d2c77c864b2/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.30 h1:mL767VAPI6+bCRuVE6L6fl0uQXeu1TcnQf02QMQ0fQc=
github.com/lyft/flyteidl v0.17.30/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flytepropeller v0.2.13 h1:RDFM8ps5bHWdHYK87NLyYX4iyF16ahkxerI0X9DZSfM=
github.com/lyft/flytepropeller v0.2.13/go.mod h1:QJ9txCCxHnzvwQoG4TbcldVs1in4+C943prLZVDmmIA=
Expand Down
159 changes: 158 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ExecutionManager struct {
userMetrics executionUserMetrics
notificationClient notificationInterfaces.Publisher
urlData dataInterfaces.RemoteURLInterface
workflowManager interfaces.WorkflowInterface
namedEntityManager interfaces.NamedEntityInterface
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -303,6 +305,150 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi
taskResourceSpec)
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {

taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.GetResourceInput{
Project: request.Spec.LaunchPlan.Project,
Domain: request.Spec.LaunchPlan.Domain,
Name: request.Spec.LaunchPlan.Name,
Version: request.Spec.LaunchPlan.Version,
})
if err != nil {
return nil, nil, err
}
task, err := transformers.FromTaskModel(taskModel)
if err != nil {
return nil, nil, err
}

// Prepare a skeleton workflow
taskIdentifier := request.Spec.LaunchPlan
workflowModel, err :=
util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
if err != nil {
logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(*workflowModel)
if err != nil {
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)
if err != nil {
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
// Also prepare a skeleton launch plan.
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
if err != nil {
return nil, nil, err
}

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Project: request.Project,
Domain: request.Domain,
Name: name,
}
ctx = getExecutionContext(ctx, &workflowExecutionID)

// Get the node execution (if any) that launched this execution
var parentNodeExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}

parentNodeExecutionID = parentNodeExecutionModel.ID
}

// Dynamically assign task resource defaults.
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
setCompiledTaskDefaults(ctx, m.config, task, m.db, name)
}

// Dynamically assign execution queues.
m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow)

inputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.Inputs)
if err != nil {
return nil, nil, err
}
userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
if err != nil {
return nil, nil, err
}
executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: request.Spec.AuthRole,
}
if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
}
if request.Spec.Annotations != nil {
executeTaskInputs.Annotations = request.Spec.Annotations.Values
}

execInfo, err := m.workflowExecutor.ExecuteTask(ctx, executeTaskInputs)
if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, request.Inputs, err)
return nil, nil, err
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())

// Request notification settings takes precedence over the launch plan settings.
// If there is no notification in the request and DisableAll is not true, use the settings from the launch plan.
var notificationsSettings []*admin.Notification
if launchPlan.Spec.GetEntityMetadata() != nil {
notificationsSettings = launchPlan.Spec.EntityMetadata.GetNotifications()
}
if request.Spec.GetNotifications() != nil && request.Spec.GetNotifications().Notifications != nil &&
len(request.Spec.GetNotifications().Notifications) > 0 {
notificationsSettings = request.Spec.GetNotifications().Notifications
} else if request.Spec.GetDisableAll() {
notificationsSettings = make([]*admin.Notification, 0)
}

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: request.Spec,
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
Principal: getUser(ctx),
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, err
}
return ctx, executionModel, nil

}

func (m *ExecutionManager) launchExecutionAndPrepareModel(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand All @@ -311,6 +457,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
}
if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK {
logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan)
return m.launchSingleTaskExecution(ctx, request, requestedAt)
}

launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan)
if err != nil {
logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err)
Expand All @@ -333,7 +484,9 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err)
return nil, nil, err
}

workflow, err := util.GetWorkflow(ctx, m.db, m.storageClient, *launchPlan.Spec.WorkflowId)

if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
Expand Down Expand Up @@ -1050,7 +1203,9 @@ func NewExecutionManager(
systemScope promutils.Scope,
userScope promutils.Scope,
publisher notificationInterfaces.Publisher,
urlData dataInterfaces.RemoteURLInterface) interfaces.ExecutionInterface {
urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface,
namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand All @@ -1070,5 +1225,7 @@ func NewExecutionManager(
userMetrics: userMetrics,
notificationClient: publisher,
urlData: urlData,
workflowManager: workflowManager,
namedEntityManager: namedEntityManager,
}
}
Loading

0 comments on commit ef390a9

Please sign in to comment.