Skip to content

Commit

Permalink
Set labels in workflow metadata (flyteorg#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
catalinii authored Oct 5, 2020
1 parent ba9f5fb commit 6567eca
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
32 changes: 32 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ func (m *ExecutionManager) launchSingleTaskExecution(
if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
}
executeTaskInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeTaskInputs.Labels)
if err != nil {
return nil, nil, err
}

if request.Spec.Annotations != nil {
executeTaskInputs.Annotations = request.Spec.Annotations.Values
}
Expand Down Expand Up @@ -632,6 +637,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
return nil, nil, err
}
executeWorkflowInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeWorkflowInputs.Labels)
if err != nil {
return nil, nil, err
}

err = m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name,
&executeWorkflowInputs)
if err != nil {
Expand Down Expand Up @@ -1346,3 +1356,25 @@ func NewExecutionManager(
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
}
}

// Adds project labels with higher precedence to workflow labels. Project labels are ignored if a corresponding label is set on the workflow.
func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName string, initialLabels map[string]string) (map[string]string, error) {
project, err := m.db.ProjectRepo().Get(ctx, projectName)
if err != nil {
logger.Errorf(ctx, "Failed to get project for [%+v] with error: %v", project, err)
return nil, err
}
// passing nil domain as not needed to retrieve labels
projectLabels := transformers.FromProjectModel(project, nil).Labels.GetValues()

if initialLabels == nil {
initialLabels = make(map[string]string)
}

for k, v := range projectLabels {
if _, ok := initialLabels[k]; !ok {
initialLabels[k] = v
}
}
return initialLabels, nil
}
13 changes: 13 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/lyft/flyteadmin/pkg/repositories/interfaces"
repositoryMocks "github.com/lyft/flyteadmin/pkg/repositories/mocks"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
runtimeIFaceMocks "github.com/lyft/flyteadmin/pkg/runtime/interfaces/mocks"
runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks"
Expand Down Expand Up @@ -198,6 +199,17 @@ func getMockRepositoryForExecTest() repositories.RepositoryInterface {

func TestCreateExecution(t *testing.T) {
repository := getMockRepositoryForExecTest()
labels := admin.Labels{
Values: map[string]string{
"label3": "3",
"label2": "1", // common label, will be dropped
}}
repository.ProjectRepo().(*repositoryMocks.MockProjectRepo).GetFunction = func(
ctx context.Context, projectID string) (models.Project, error) {
return transformers.CreateProjectModel(&admin.Project{
Labels: &labels}), nil
}

principal := "principal"
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
Expand All @@ -214,6 +226,7 @@ func TestCreateExecution(t *testing.T) {
assert.EqualValues(t, map[string]string{
"label1": "1",
"label2": "2",
"label3": "3",
}, inputs.Labels)
assert.EqualValues(t, map[string]string{
"annotation3": "3",
Expand Down

0 comments on commit 6567eca

Please sign in to comment.