Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Flyte Execution tags #571

Merged
merged 30 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.11
github.com/flyteorg/flyteidl v1.5.14
github.com/flyteorg/flyteplugins v1.0.67
github.com/flyteorg/flytepropeller v1.1.98
github.com/flyteorg/flytestdlib v1.0.20
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0=
github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
NamedEntityMetadata = "nem"
Project = "p"
Signal = "s"
AdminTag = "at"
ExecutionAdminTag = "eat"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
workflowExecutionID, err)
return nil, nil, err
}

return ctx, executionModel, nil
}

Expand Down Expand Up @@ -1478,6 +1479,7 @@ func (m *ExecutionManager) ListExecutions(
execution.Spec.Inputs = nil
execution.Closure.ComputedInputs = nil
}

// END TO BE DELETED
var token string
if len(executionList) == int(request.Limit) {
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/impl/testutils/mock_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest {
},
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"},
Envs: &admin.Envs{},
Tags: []string{"tag1", "tag2"},
},
Inputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var filterFieldEntityPrefix = map[string]common.Entity{
"named_entity_metadata": common.NamedEntityMetadata,
"project": common.Project,
"signal": common.Signal,
"admin_tag": common.AdminTag,
"execution_admin_tag": common.ExecutionAdminTag,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,89 @@ var NoopMigrations = []*gormigrate.Migration{
return nil
},
},

{
ID: "2023-08-04-admin-tags",
Migrate: func(tx *gorm.DB) error {
type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
}

return tx.AutoMigrate(&AdminTag{})
},
Rollback: func(tx *gorm.DB) error {
return nil
},
},

{
ID: "2023-08-04-execution-admin-tags", // A join table used to associate executions with tags
Migrate: func(tx *gorm.DB) error {
type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
}

type ExecutionKey struct {
Project string `gorm:"primary_key;column:execution_project" valid:"length(0|255)"`
Domain string `gorm:"primary_key;column:execution_domain" valid:"length(0|255)"`
Name string `gorm:"primary_key;column:execution_name" valid:"length(0|255)"`
}

type Execution struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time `gorm:"index"`
ExecutionKey
LaunchPlanID uint `gorm:"index"`
WorkflowID uint `gorm:"index"`
TaskID uint `gorm:"index"`
Phase string `valid:"length(0|255)"`
Closure []byte
Spec []byte `gorm:"not null"`
StartedAt *time.Time
// Corresponds to the CreatedAt field in the Execution closure.
// Prefixed with Execution to avoid clashes with gorm.Model CreatedAt
ExecutionCreatedAt *time.Time `gorm:"index:idx_executions_created_at"`
// Corresponds to the UpdatedAt field in the Execution closure
// Prefixed with Execution to avoid clashes with gorm.Model UpdatedAt
ExecutionUpdatedAt *time.Time
Duration time.Duration
// In the case of an aborted execution this string may be non-empty.
// It should be ignored for any other value of phase other than aborted.
AbortCause string `valid:"length(0|255)"`
// Corresponds to the execution mode used to trigger this execution
Mode int32
// The "parent" execution (if there is one) that is related to this execution.
SourceExecutionID uint
// The parent node execution if this was launched by a node
ParentNodeExecutionID uint
// Cluster where execution was triggered
Cluster string `valid:"length(0|255)"`
// Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults.
InputsURI storage.DataReference
// User specified inputs. This map might be incomplete and not include defaults applied
UserInputsURI storage.DataReference
// Execution Error Kind. nullable
ErrorKind *string `gorm:"index"`
// Execution Error Code nullable
ErrorCode *string `valid:"length(0|255)"`
// The user responsible for launching this execution.
// This is also stored in the spec but promoted as a column for filtering.
User string `gorm:"index" valid:"length(0|255)"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string
// Tags associated with the execution
Tags []AdminTag `gorm:"many2many:execution_admin_tags;"`
}

return tx.AutoMigrate(&Execution{})
},
},
}

var Migrations = append(LegacyMigrations, NoopMigrations...)
Expand Down
4 changes: 4 additions & 0 deletions pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const workflowTableName = "workflows"
const descriptionEntityTableName = "description_entities"
const AdminTagsTableName = "admin_tags"
const executionAdminTagsTableName = "execution_admin_tags"

const limit = "limit"
const filters = "filters"
Expand All @@ -45,6 +47,8 @@ var entityToTableName = map[common.Entity]string{
common.NamedEntity: "entities",
common.NamedEntityMetadata: "named_entity_metadata",
common.Signal: "signals",
common.AdminTag: "admin_tags",
common.ExecutionAdminTag: "execution_admin_tags",
}

var innerJoinExecToNodeExec = fmt.Sprintf(
Expand Down
11 changes: 9 additions & 2 deletions pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) erro
return nil
}

func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
func (r *ExecutionRepo) Get(_ context.Context, input interfaces.Identifier) (models.Execution, error) {
var execution models.Execution
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Execution{
Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution)
return nil
}

func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) (
func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error) {
var err error
// First validate input.
Expand All @@ -89,6 +89,13 @@ func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceI
taskTableName, executionTableName, taskTableName))
}

if ok := input.JoinTableEntities[common.AdminTag]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name",
executionAdminTagsTableName, executionTableName, executionAdminTagsTableName))
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.id = %s.admin_tag_id",
AdminTagsTableName, AdminTagsTableName, executionAdminTagsTableName))
}

// Apply filters
tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
if err != nil {
Expand Down
40 changes: 37 additions & 3 deletions pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ func TestListExecutions_Order(t *testing.T) {
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_WithTags(t *testing.T) {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())

executions := make([]map[string]interface{}, 0)
GlobalMock := mocket.Catcher.Reset()
// Only match on queries that include ordering by name
mockQuery := GlobalMock.NewMock().WithQuery(`name asc`)
mockQuery.WithReply(executions)

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_ASCENDING,
Key: "name",
})
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals)
assert.NoError(t, err)
_, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{
SortParameter: sortParameter,
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Task, "project", project),
getEqualityFilter(common.Task, "domain", domain),
getEqualityFilter(common.Task, "name", name),
tagFilter,
},
Limit: 20,
})
assert.NoError(t, err)
assert.True(t, mockQuery.Triggered)
}

func TestListExecutions_MissingParameters(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
_, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
Expand Down Expand Up @@ -306,29 +336,33 @@ func TestListExecutionsForWorkflow(t *testing.T) {
StartedAt: &executionStartedAt,
Duration: time.Hour,
LaunchEntity: "launch_plan",
Tags: []models.AdminTag{{Name: "tag1"}, {Name: "tag2"}},
})
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true

// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions)

GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions)
vals := []string{"tag1", "tag2"}
tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals)
assert.NoError(t, err)
collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.Execution, "project", project),
getEqualityFilter(common.Execution, "domain", domain),
getEqualityFilter(common.Execution, "name", "1"),
getEqualityFilter(common.Workflow, "name", "workflow_name"),
getEqualityFilter(common.Task, "name", "task_name"),
tagFilter,
},
Limit: 20,
JoinTableEntities: map[common.Entity]bool{
common.Workflow: true,
common.Task: true,
},
})

assert.NoError(t, err)
assert.NotEmpty(t, collection)
assert.NotEmpty(t, collection.Executions)
Expand Down
19 changes: 19 additions & 0 deletions pkg/repositories/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package models
import (
"time"

"gorm.io/gorm/clause"

"gorm.io/gorm"

"github.com/flyteorg/flytestdlib/storage"
)

Expand Down Expand Up @@ -60,4 +64,19 @@ type Execution struct {
State *int32 `gorm:"index;default:0"`
// The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task'
LaunchEntity string
// Tags associated with the execution
Tags []AdminTag `gorm:"many2many:execution_admin_tags;"`
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}

type AdminTag struct {
gorm.Model
Name string `gorm:"index:,unique;size:255"`
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *AdminTag) BeforeCreate(tx *gorm.DB) (err error) {
tx.Statement.AddClause(clause.OnConflict{
Columns: []clause.Column{{Name: "name"}}, // key column
DoUpdates: clause.AssignmentColumns([]string{"name"}), // column needed to be updated
})
return nil
}
6 changes: 6 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
tags := make([]models.AdminTag, len(input.RequestSpec.Tags))
for i, tag := range input.RequestSpec.Tags {
tags[i] = models.AdminTag{Name: tag}
}
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

executionModel := &models.Execution{
ExecutionKey: models.ExecutionKey{
Project: input.WorkflowExecutionID.Project,
Expand All @@ -119,6 +124,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
User: requestSpec.Metadata.Principal,
State: &activeExecution,
LaunchEntity: strings.ToLower(input.LaunchEntity.String()),
Tags: tags,
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}
// A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed
// with a reference launch plan which is why this behavior is the default below.
Expand Down
4 changes: 4 additions & 0 deletions tests/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func truncateAllTablesForTestingOnly() {
TruncateResources := fmt.Sprintf("TRUNCATE TABLE resources;")
TruncateSchedulableEntities := fmt.Sprintf("TRUNCATE TABLE schedulable_entities;")
TruncateSchedulableEntitiesSnapshots := fmt.Sprintf("TRUNCATE TABLE schedule_entities_snapshots;")
TruncateAdminTags := fmt.Sprintf("TRUNCATE TABLE admin_tags;")
TruncateExecutionAdminTags := fmt.Sprintf("TRUNCATE TABLE execution_admin_tags;")
ctx := context.Background()
db, err := repositories.GetDB(ctx, getDbConfig(), getLoggerConfig())
if err != nil {
Expand Down Expand Up @@ -100,6 +102,8 @@ func truncateAllTablesForTestingOnly() {
db.Exec(TruncateResources)
db.Exec(TruncateSchedulableEntities)
db.Exec(TruncateSchedulableEntitiesSnapshots)
db.Exec(TruncateAdminTags)
db.Exec(TruncateExecutionAdminTags)
}

func populateWorkflowExecutionForTestingOnly(project, domain, name string) {
Expand Down
28 changes: 28 additions & 0 deletions tests/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func populateWorkflowExecutionsForTestingOnly() {
db.Exec(`INSERT INTO workflows ("id", "project", "domain", "name", "version", "remote_closure_identifier") ` +
`VALUES (4, 'project2', 'domain2', 'name2', 'version1', 's3://foo')`)

// Insert dummy tags
db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (1, 'hello')`)
db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (2, 'flyte')`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 1)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 2)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name3', 2)`)
db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name4', 1)`)

for _, statement := range insertExecutionStatements {
db.Exec(statement)
}
Expand All @@ -209,6 +217,26 @@ func TestListWorkflowExecutions(t *testing.T) {
assert.Equal(t, len(resp.Executions), 4)
}

func TestListWorkflowExecutionsWithTags(t *testing.T) {
truncateAllTablesForTestingOnly()
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

resp, err := client.ListExecutions(ctx, &admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Project: "project1",
Domain: "domain1",
},
Limit: 5,
Filters: "value_in(admin_tag.name, hello)",
})
assert.Nil(t, err)
assert.Equal(t, len(resp.Executions), 2)
}
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

func TestListWorkflowExecutions_Filters(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()
Expand Down