Skip to content

Commit

Permalink
Remove primary queue references (flyteorg#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Feb 13, 2020
1 parent d82e62e commit 6a64f00
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 44 deletions.
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
_ "github.com/alvaroloes/enumer"
)
17 changes: 7 additions & 10 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,26 @@ storage:
container: "flyte"
queues:
executionQueues:
- primary: "gpu_primary"
dynamic: "gpu_dynamic"
- dynamic: "gpu_dynamic"
attributes:
- gpu
- primary: "critical"
dynamic: "critical"
- dynamic: "critical"
attributes:
- critical
- primary: "default"
dynamic: "default"
- dynamic: "default"
attributes:
- defaultclusters
- primary: "my_queue_1"
workflowConfigs
- project: "my_queue_1"
domain: "production"
workflowName: "my_workflow_1"
tags:
- critical
- primary: "my_queue_1"
domain: "production"
- project: "production"
workflowName: "my_workflow_2"
tags:
- gpu
- primary: "my_queue_3"
- project: "my_queue_3"
domain: "production"
workflowName: "my_workflow_3"
tags:
Expand Down
8 changes: 0 additions & 8 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
)

const parentContainerQueueKey = "parent_queue"
const childContainerQueueKey = "child_queue"
const noSourceExecutionID = 0
const principalContextKeyFormat = "%v"
Expand Down Expand Up @@ -107,13 +106,6 @@ func (m *ExecutionManager) populateExecutionQueue(
// Unrecognized target type, nothing to do
continue
}
if queueConfig.PrimaryQueue != "" {
logger.Debugf(ctx, "Assigning %s as parent queue for task %+v", queueConfig.PrimaryQueue, task.Template.Id)
container.Config = append(container.Config, &core.KeyValuePair{
Key: parentContainerQueueKey,
Value: queueConfig.PrimaryQueue,
})
}

if queueConfig.DynamicQueue != "" {
logger.Debugf(ctx, "Assigning %s as child queue for task %+v", queueConfig.DynamicQueue, task.Template.Id)
Expand Down
9 changes: 3 additions & 6 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
testutils.GetApplicationConfigWithDefaultProjects(),
runtimeMocks.NewMockQueueConfigurationProvider([]runtimeInterfaces.ExecutionQueue{
{
Primary: "primary Q",
Dynamic: "dynamic Q",
Attributes: []string{"tag"},
},
Expand All @@ -349,11 +348,9 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
func(inputs workflowengineInterfaces.ExecuteWorkflowInput) (*workflowengineInterfaces.ExecutionInfo, error) {
assert.NotEmpty(t, inputs.WfClosure.Tasks)
for _, task := range inputs.WfClosure.Tasks {
assert.Len(t, task.Template.GetContainer().Config, 2)
assert.Contains(t, parentContainerQueueKey, task.Template.GetContainer().Config[0].Key)
assert.Contains(t, "primary Q", task.Template.GetContainer().Config[0].Value)
assert.Contains(t, childContainerQueueKey, task.Template.GetContainer().Config[1].Key)
assert.Contains(t, "dynamic Q", task.Template.GetContainer().Config[1].Value)
assert.Len(t, task.Template.GetContainer().Config, 1)
assert.Contains(t, childContainerQueueKey, task.Template.GetContainer().Config[0].Key)
assert.Contains(t, "dynamic Q", task.Template.GetContainer().Config[0].Value)
}
assert.Equal(t, requestedAt, inputs.AcceptedAt)
return &workflowengineInterfaces.ExecutionInfo{
Expand Down
2 changes: 0 additions & 2 deletions pkg/manager/impl/executions/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
type tag = string

type singleQueueConfiguration struct {
PrimaryQueue string
DynamicQueue string
}

Expand Down Expand Up @@ -49,7 +48,6 @@ func (q *queueAllocatorImpl) refreshExecutionQueues(executionQueues []runtimeInt
queuesForTag = make(queues, 0, 1)
}
queueConfigMap[tag] = append(queuesForTag, singleQueueConfiguration{
PrimaryQueue: queue.Primary,
DynamicQueue: queue.Dynamic,
})
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/manager/impl/executions/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const testWorkflow = "name"
func TestGetQueue(t *testing.T) {
executionQueues := []runtimeInterfaces.ExecutionQueue{
{
Primary: "queue primary",
Dynamic: "queue dynamic",
Attributes: []string{"attribute"},
},
Expand Down Expand Up @@ -68,7 +67,6 @@ func TestGetQueue(t *testing.T) {
nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, nil),
nil, nil, nil, nil), db)
queueConfig := singleQueueConfiguration{
PrimaryQueue: "queue primary",
DynamicQueue: "queue dynamic",
}
assert.Equal(t, queueConfig, queueAllocator.GetQueue(context.Background(), core.Identifier{
Expand Down Expand Up @@ -96,22 +94,18 @@ func TestGetQueue(t *testing.T) {
func TestGetQueueDefaults(t *testing.T) {
executionQueues := []runtimeInterfaces.ExecutionQueue{
{
Primary: "queue1 primary",
Dynamic: "queue1 dynamic",
Attributes: []string{"attr1"},
},
{
Primary: "queue2 primary",
Dynamic: "queue2 dynamic",
Attributes: []string{"attr2"},
},
{
Primary: "queue3 primary",
Dynamic: "queue3 dynamic",
Attributes: []string{"attr3"},
},
{
Primary: "default primary",
Dynamic: "default dynamic",
Attributes: []string{"default"},
},
Expand Down Expand Up @@ -180,7 +174,6 @@ func TestGetQueueDefaults(t *testing.T) {
nil, runtimeMocks.NewMockQueueConfigurationProvider(executionQueues, workflowConfigs), nil,
nil, nil, nil), db)
assert.Equal(t, singleQueueConfiguration{
PrimaryQueue: "default primary",
DynamicQueue: "default dynamic",
}, queueAllocator.GetQueue(
context.Background(), core.Identifier{
Expand All @@ -189,7 +182,6 @@ func TestGetQueueDefaults(t *testing.T) {
Name: "workflow",
}))
assert.EqualValues(t, singleQueueConfiguration{
PrimaryQueue: "queue1 primary",
DynamicQueue: "queue1 dynamic",
}, queueAllocator.GetQueue(
context.Background(), core.Identifier{
Expand All @@ -198,7 +190,6 @@ func TestGetQueueDefaults(t *testing.T) {
Name: "workflow",
}))
assert.EqualValues(t, singleQueueConfiguration{
PrimaryQueue: "queue2 primary",
DynamicQueue: "queue2 dynamic",
}, queueAllocator.GetQueue(
context.Background(), core.Identifier{
Expand All @@ -207,7 +198,6 @@ func TestGetQueueDefaults(t *testing.T) {
Name: "UNMATCHED",
}))
assert.Equal(t, singleQueueConfiguration{
PrimaryQueue: "queue3 primary",
DynamicQueue: "queue3 dynamic",
}, queueAllocator.GetQueue(
context.Background(), core.Identifier{
Expand Down
1 change: 0 additions & 1 deletion pkg/runtime/interfaces/queue_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package interfaces
// Holds details about a queue used for task execution.
// Matching attributes determine which workflows' tasks will run where.
type ExecutionQueue struct {
Primary string
Dynamic string
Attributes []string
}
Expand Down
9 changes: 3 additions & 6 deletions script/integration/k8s/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,13 @@ data:
storage: 10G
queues:
executionQueues:
- primary: "gpu_primary"
dynamic: "gpu_dynamic"
- dynamic: "gpu_dynamic"
attributes:
- gpu
- primary: "critical"
dynamic: "critical"
- dynamic: "critical"
attributes:
- critical
- primary: "default"
dynamic: "default"
- dynamic: "default"
attributes:
- default
workflowConfigs:
Expand Down

0 comments on commit 6a64f00

Please sign in to comment.