Skip to content

Commit

Permalink
Add launch entity type to execution model (flyteorg#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Dec 10, 2022
1 parent 5df4f41 commit 77157ff
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 15 deletions.
2 changes: 2 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: taskIdentifier.ResourceType,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -974,6 +975,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down
39 changes: 25 additions & 14 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func TestCreateExecution(t *testing.T) {
assert.Equal(t, principal, spec.Metadata.Principal)
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment))
assert.Equal(t, "launch_plan", input.LaunchEntity)
return nil
})
setDefaultLpCallbackForExecTest(repository)
Expand Down Expand Up @@ -4099,20 +4100,21 @@ func TestCreateSingleTaskExecution(t *testing.T) {
return newlyCreatedWorkflow, nil
}
repository.WorkflowRepo().(*repositoryMocks.MockWorkflowRepo).SetGetCallback(workflowGetFunc)
taskIdentifier := &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "flytekit",
Domain: "production",
Name: "simple_task",
Version: "12345",
}
repository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetGetCallback(
func(input interfaces.Identifier) (models.Task, error) {
createdAt := time.Now()
createdAtProto, _ := ptypes.TimestampProto(createdAt)
taskClosure := &admin.TaskClosure{
CompiledTask: &core.CompiledTask{
Template: &core.TaskTemplate{
Id: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "flytekit",
Domain: "production",
Name: "simple_task",
Version: "12345",
},
Id: taskIdentifier,
Type: "python-task",
Metadata: &core.TaskMetadata{
Runtime: &core.RuntimeMetadata{
Expand Down Expand Up @@ -4199,6 +4201,21 @@ func TestCreateSingleTaskExecution(t *testing.T) {
Type: "python",
}, nil
})
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, models.ExecutionKey{
Project: "flytekit",
Domain: "production",
Name: "singletaskexec",
}, input.ExecutionKey)
assert.Equal(t, "task", input.LaunchEntity)
assert.Equal(t, "UNDEFINED", input.Phase)
assert.True(t, proto.Equal(taskIdentifier, spec.LaunchPlan))
return nil
})

mockStorage := getMockStorageForExecTest(context.Background())
workflowManager := NewWorkflowManager(
Expand All @@ -4218,13 +4235,7 @@ func TestCreateSingleTaskExecution(t *testing.T) {
Domain: "production",
Name: "singletaskexec",
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: "flytekit",
Domain: "production",
Name: "simple_task",
Version: "12345",
ResourceType: core.ResourceType_TASK,
},
LaunchPlan: taskIdentifier,
},
Inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down
10 changes: 10 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,16 @@ var Migrations = []*gormigrate.Migration{
return tx.Model(&models.Execution{}).Migrator().DropIndex(&models.Execution{}, "idx_executions_created_at")
},
},
// Add the launch_type resource to the execution model
{
ID: "2022-12-09-execution-launch-type",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(&models.Execution{})
},
Rollback: func(tx *gorm.DB) error {
return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_type")
},
},
}

func alterTableColumnType(db *sql.DB, columnName, columnType string) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func getMockExecutionResponseFromDb(expected models.Execution) map[string]interf
execution["execution_updated_at"] = expected.ExecutionUpdatedAt
execution["duration"] = expected.Duration
execution["mode"] = expected.Mode
execution["launch_entity"] = expected.LaunchEntity
return execution
}

Expand All @@ -118,6 +119,7 @@ func TestGetExecution(t *testing.T) {
StartedAt: &executionStartedAt,
ExecutionCreatedAt: &createdAt,
ExecutionUpdatedAt: &executionUpdatedAt,
LaunchEntity: "task",
}

executions := make([]map[string]interface{}, 0)
Expand Down Expand Up @@ -303,14 +305,15 @@ func TestListExecutionsForWorkflow(t *testing.T) {
Spec: []byte{3, 4},
StartedAt: &executionStartedAt,
Duration: time.Hour,
LaunchEntity: "launch_plan",
})
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true

// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND (workflows.name = $4) AND tasks.name = $5 LIMIT 20`).WithReply(executions)
GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND (workflows.name = $4) AND tasks.name = $5 LIMIT 20`).WithReply(executions)

collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -341,6 +344,7 @@ func TestListExecutionsForWorkflow(t *testing.T) {
assert.Equal(t, []byte{3, 4}, execution.Spec)
assert.Equal(t, executionStartedAt, *execution.StartedAt)
assert.Equal(t, time.Hour, execution.Duration)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/repositories/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ type Execution struct {
User string `gorm:"index" valid:"length(0|255)"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string
}
3 changes: 3 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transformers
import (
"context"
"fmt"
"strings"
"time"

"github.com/flyteorg/flyteadmin/pkg/common"
Expand Down Expand Up @@ -40,6 +41,7 @@ type CreateExecutionModelInput struct {
InputsURI storage.DataReference
UserInputsURI storage.DataReference
SecurityContext *core.SecurityContext
LaunchEntity core.ResourceType
}

// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
Expand Down Expand Up @@ -102,6 +104,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
UserInputsURI: input.UserInputsURI,
User: requestSpec.Metadata.Principal,
State: &activeExecution,
LaunchEntity: strings.ToLower(input.LaunchEntity.String()),
}
// A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed
// with a reference launch plan which is why this behavior is the default below.
Expand Down
2 changes: 2 additions & 0 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestCreateExecutionModel(t *testing.T) {
SourceExecutionID: sourceID,
Cluster: cluster,
SecurityContext: securityCtx,
LaunchEntity: core.ResourceType_LAUNCH_PLAN,
})
assert.NoError(t, err)
assert.Equal(t, "project", execution.Project)
Expand All @@ -100,6 +101,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, int32(admin.ExecutionMetadata_SYSTEM), execution.Mode)
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand Down

0 comments on commit 77157ff

Please sign in to comment.