Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Flyte Execution tags #571

Merged
merged 30 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,5 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091
// Retracted versions
// This was published in error when attempting to create 1.5.1 Flyte release.
retract v1.1.94

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.7 h1:voAxMMFsKOseNFSlCyRGlpegqtQXtJjyxgsQzZg4tts=
github.com/flyteorg/flyteidl v1.5.7/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d h1:3wgeLYVRYZTM5v2GO03Tg65HU6G1VIxPSerx9x3Xc2k=
github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw=
github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU=
github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM=
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
NamedEntityMetadata = "nem"
Project = "p"
Signal = "s"
AdminTag = "at"
ExecutionAdminTag = "eat"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,12 +907,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
Tags: request.Spec.Tags,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, err
}

return ctx, executionModel, nil
}

Expand Down Expand Up @@ -1478,6 +1480,7 @@ func (m *ExecutionManager) ListExecutions(
execution.Spec.Inputs = nil
execution.Closure.ComputedInputs = nil
}

// END TO BE DELETED
var token string
if len(executionList) == int(request.Limit) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var filterFieldEntityPrefix = map[string]common.Entity{
"named_entity_metadata": common.NamedEntityMetadata,
"project": common.Project,
"signal": common.Signal,
"admin_tag": common.AdminTag,
"execution_admin_tag": common.ExecutionAdminTag,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ var LegacyMigrations = []*gormigrate.Migration{
return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_entity")
},
},
// Create admin tags table.
{
ID: "2023-06-01-admin_tags",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(&models.AdminTag{})
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("admin_tags")
},
},
// Add execution <-> admin_tags join table.
{
ID: "2023-06-01-execution_admin_tags",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(&models.Execution{})
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_admin_tags")
},
},
}

var NoopMigrations = []*gormigrate.Migration{
Expand Down
4 changes: 4 additions & 0 deletions pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const workflowTableName = "workflows"
const descriptionEntityTableName = "description_entities"
const AdminTagsTableName = "admin_tags"
const executionAdminTagsTableName = "execution_admin_tags"

const limit = "limit"
const filters = "filters"
Expand All @@ -45,6 +47,8 @@ var entityToTableName = map[common.Entity]string{
common.NamedEntity: "entities",
common.NamedEntityMetadata: "named_entity_metadata",
common.Signal: "signals",
common.AdminTag: "admin_tags",
common.ExecutionAdminTag: "execution_admin_tags",
}

var innerJoinExecToNodeExec = fmt.Sprintf(
Expand Down
11 changes: 9 additions & 2 deletions pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) erro
return nil
}

func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
func (r *ExecutionRepo) Get(_ context.Context, input interfaces.Identifier) (models.Execution, error) {
var execution models.Execution
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Execution{
Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution)
return nil
}

func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) (
func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error) {
var err error
// First validate input.
Expand All @@ -89,6 +89,13 @@ func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceI
taskTableName, executionTableName, taskTableName))
}

if ok := input.JoinTableEntities[common.AdminTag]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name",
executionAdminTagsTableName, executionTableName, executionAdminTagsTableName))
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.id = %s.admin_tag_id",
AdminTagsTableName, AdminTagsTableName, executionAdminTagsTableName))
}

// Apply filters
tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
if err != nil {
Expand Down
38 changes: 35 additions & 3 deletions pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,35 @@ func TestListExecutions_Order(t *testing.T) {
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_WithTags(t *testing.T) {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())

executions := make([]map[string]interface{}, 0)
GlobalMock := mocket.Catcher.Reset()
// Only match on queries that include ordering by name
mockQuery := GlobalMock.NewMock().WithQuery(`name asc`)
mockQuery.WithReply(executions)

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_ASCENDING,
Key: "name",
})
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals)
_, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{
SortParameter: sortParameter,
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Task, "project", project),
getEqualityFilter(common.Task, "domain", domain),
getEqualityFilter(common.Task, "name", name),
tagFilter,
},
Limit: 20,
})
assert.NoError(t, err)
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_MissingParameters(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
_, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
Expand Down Expand Up @@ -306,29 +335,32 @@ func TestListExecutionsForWorkflow(t *testing.T) {
StartedAt: &executionStartedAt,
Duration: time.Hour,
LaunchEntity: "launch_plan",
Tags: []models.AdminTag{{Name: "tag1"}, {Name: "tag2"}},
})
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true

// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions)

GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions)
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals)
collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Execution, "project", project),
getEqualityFilter(common.Execution, "domain", domain),
getEqualityFilter(common.Execution, "name", "1"),
getEqualityFilter(common.Workflow, "name", "workflow_name"),
getEqualityFilter(common.Task, "name", "task_name"),
tagFilter,
},
Limit: 20,
JoinTableEntities: map[common.Entity]bool{
common.Workflow: true,
common.Task: true,
},
})

assert.NoError(t, err)
assert.NotEmpty(t, collection)
assert.NotEmpty(t, collection.Executions)
Expand Down
8 changes: 8 additions & 0 deletions pkg/repositories/models/execution.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"gorm.io/gorm"
"time"

"github.com/flyteorg/flytestdlib/storage"
Expand Down Expand Up @@ -60,4 +61,11 @@ type Execution struct {
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string

Tags []AdminTag `gorm:"many2many:execution_admin_tags;"`
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}

type AdminTag struct {
gorm.Model
Name string `valid:"length(0|255)"`
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}
7 changes: 7 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CreateExecutionModelInput struct {
SecurityContext *core.SecurityContext
LaunchEntity core.ResourceType
Namespace string
Tags []string
}

type ExecutionTransformerOptions struct {
Expand Down Expand Up @@ -99,6 +100,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
tags := make([]models.AdminTag, len(input.Tags))
for i, tag := range input.Tags {
tags[i] = models.AdminTag{Name: tag}
}

executionModel := &models.Execution{
ExecutionKey: models.ExecutionKey{
Project: input.WorkflowExecutionID.Project,
Expand All @@ -119,6 +125,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
User: requestSpec.Metadata.Principal,
State: &activeExecution,
LaunchEntity: strings.ToLower(input.LaunchEntity.String()),
Tags: tags,
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}
// A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed
// with a reference launch plan which is why this behavior is the default below.
Expand Down