Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Set labels in workflow metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Catalin Toda committed Oct 5, 2020
1 parent c5425dd commit 0c0b505
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
35 changes: 35 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}

executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Expand All @@ -464,9 +465,15 @@ func (m *ExecutionManager) launchSingleTaskExecution(
Auth: request.Spec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
}

if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
}
executeTaskInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeTaskInputs.Labels)
if err != nil {
return nil, nil, err
}

if request.Spec.Annotations != nil {
executeTaskInputs.Annotations = request.Spec.Annotations.Values
}
Expand Down Expand Up @@ -632,6 +639,12 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
return nil, nil, err
}

executeWorkflowInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeWorkflowInputs.Labels)
if err != nil {
return nil, nil, err
}

err = m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name,
&executeWorkflowInputs)
if err != nil {
Expand Down Expand Up @@ -1346,3 +1359,25 @@ func NewExecutionManager(
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
}
}

func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName string, initialLabels map[string]string) (map[string]string, error) {
project, err := m.db.ProjectRepo().Get(ctx, projectName)
if err != nil {
logger.Errorf(ctx, "Failed to get project for [%+v] with error: %v", project, err)
return nil, err
}
// passing nil domain as not needed to retrieve labels
projectLabels := transformers.FromProjectModel(project, nil).Labels.GetValues()

if initialLabels == nil {
initialLabels = make(map[string]string)
}

// Add the project labels only if not set before
for k, v := range projectLabels {
if _, ok := initialLabels[k]; !ok {
initialLabels[k] = v
}
}
return initialLabels, nil
}
13 changes: 13 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/lyft/flyteadmin/pkg/repositories/interfaces"
repositoryMocks "github.com/lyft/flyteadmin/pkg/repositories/mocks"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
runtimeIFaceMocks "github.com/lyft/flyteadmin/pkg/runtime/interfaces/mocks"
runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks"
Expand Down Expand Up @@ -198,6 +199,17 @@ func getMockRepositoryForExecTest() repositories.RepositoryInterface {

func TestCreateExecution(t *testing.T) {
repository := getMockRepositoryForExecTest()
labels := admin.Labels{
Values: map[string]string{
"label3": "3",
"label2": "1", // common label, will be dropped
}}
repository.ProjectRepo().(*repositoryMocks.MockProjectRepo).GetFunction = func(
ctx context.Context, projectID string) (models.Project, error) {
return transformers.CreateProjectModel(&admin.Project{
Labels: &labels}), nil
}

principal := "principal"
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
Expand All @@ -214,6 +226,7 @@ func TestCreateExecution(t *testing.T) {
assert.EqualValues(t, map[string]string{
"label1": "1",
"label2": "2",
"label3": "3",
}, inputs.Labels)
assert.EqualValues(t, map[string]string{
"annotation3": "3",
Expand Down
3 changes: 3 additions & 0 deletions pkg/workflowengine/impl/propeller_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ func TestExecuteWorkflowHappyCase(t *testing.T) {
MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT,
},
},
ProjectLabels: map[string]string{
"customlabel": "labelval",
},
})
assert.Nil(t, err)
assert.NotNil(t, execInfo)
Expand Down
2 changes: 2 additions & 0 deletions pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ExecuteWorkflowInput struct {
Annotations map[string]string
QueueingBudget time.Duration
TaskPluginOverrides []*admin.PluginOverride
ProjectLabels map[string]string
}

type ExecuteTaskInput struct {
Expand All @@ -31,6 +32,7 @@ type ExecuteTaskInput struct {
Labels map[string]string
Annotations map[string]string
QueueingBudget time.Duration
ProjectLabels map[string]string
}

type TerminateWorkflowInput struct {
Expand Down

0 comments on commit 0c0b505

Please sign in to comment.