Skip to content

Commit

Permalink
Use ResourceManager to aid in validating task resources (flyteorg#517)
Browse files Browse the repository at this point in the history
* using resource manager to validate task resources

Signed-off-by: Daniel Rammer <[email protected]>

* moving unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* docs

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Feb 14, 2023
1 parent 960ea1f commit 4b712bd
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 348 deletions.
103 changes: 3 additions & 100 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (

"github.com/flyteorg/flyteadmin/auth"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"

dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
Expand Down Expand Up @@ -186,39 +184,6 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *
return nil, nil
}

type completeTaskResources struct {
Defaults runtimeInterfaces.TaskResourceSet
Limits runtimeInterfaces.TaskResourceSet
}

func getTaskResourcesAsSet(ctx context.Context, identifier *core.Identifier,
resourceEntries []*core.Resources_ResourceEntry, resourceName string) runtimeInterfaces.TaskResourceSet {

result := runtimeInterfaces.TaskResourceSet{}
for _, entry := range resourceEntries {
switch entry.Name {
case core.Resources_CPU:
result.CPU = parseQuantityNoError(ctx, identifier.String(), fmt.Sprintf("%v.cpu", resourceName), entry.Value)
case core.Resources_MEMORY:
result.Memory = parseQuantityNoError(ctx, identifier.String(), fmt.Sprintf("%v.memory", resourceName), entry.Value)
case core.Resources_EPHEMERAL_STORAGE:
result.EphemeralStorage = parseQuantityNoError(ctx, identifier.String(),
fmt.Sprintf("%v.ephemeral storage", resourceName), entry.Value)
case core.Resources_GPU:
result.GPU = parseQuantityNoError(ctx, identifier.String(), "gpu", entry.Value)
}
}

return result
}

func getCompleteTaskResourceRequirements(ctx context.Context, identifier *core.Identifier, task *core.CompiledTask) completeTaskResources {
return completeTaskResources{
Defaults: getTaskResourcesAsSet(ctx, identifier, task.GetTemplate().GetContainer().Resources.Requests, "requests"),
Limits: getTaskResourcesAsSet(ctx, identifier, task.GetTemplate().GetContainer().Resources.Limits, "limits"),
}
}

// TODO: Delete this code usage after the flyte v0.17.0 release
// Assumes input contains a compiled task with a valid container resource execConfig.
//
Expand Down Expand Up @@ -254,7 +219,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
// The IDL representation for container-type tasks represents resources as a list with string quantities.
// In order to easily reason about them we convert them to a set where we can O(1) fetch specific resources (e.g. CPU)
// and represent them as comparable quantities rather than strings.
taskResourceRequirements := getCompleteTaskResourceRequirements(ctx, task.Template.Id, task)
taskResourceRequirements := util.GetCompleteTaskResourceRequirements(ctx, task.Template.Id, task)

cpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.CPU, taskResourceRequirements.Limits.CPU,
platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU)
Expand Down Expand Up @@ -334,68 +299,6 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
}
}

func parseQuantityNoError(ctx context.Context, ownerID, name, value string) resource.Quantity {
q, err := resource.ParseQuantity(value)
if err != nil {
logger.Infof(ctx, "Failed to parse owner's [%s] resource [%s]'s value [%s] with err: %v", ownerID, name, value, err)
}

return q
}

func fromAdminProtoTaskResourceSpec(ctx context.Context, spec *admin.TaskResourceSpec) runtimeInterfaces.TaskResourceSet {
result := runtimeInterfaces.TaskResourceSet{}
if len(spec.Cpu) > 0 {
result.CPU = parseQuantityNoError(ctx, "project", "cpu", spec.Cpu)
}

if len(spec.Memory) > 0 {
result.Memory = parseQuantityNoError(ctx, "project", "memory", spec.Memory)
}

if len(spec.Storage) > 0 {
result.Storage = parseQuantityNoError(ctx, "project", "storage", spec.Storage)
}

if len(spec.EphemeralStorage) > 0 {
result.EphemeralStorage = parseQuantityNoError(ctx, "project", "ephemeral storage", spec.EphemeralStorage)
}

if len(spec.Gpu) > 0 {
result.GPU = parseQuantityNoError(ctx, "project", "gpu", spec.Gpu)
}

return result
}

func (m *ExecutionManager) getTaskResources(ctx context.Context, workflow *core.Identifier) workflowengineInterfaces.TaskResources {
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: workflow.Project,
Domain: workflow.Domain,
Workflow: workflow.Name,
ResourceType: admin.MatchableResource_TASK_RESOURCE,
})

if err != nil {
logger.Warningf(ctx, "Failed to fetch override values when assigning task resource default values for [%+v]: %v",
workflow, err)
}

logger.Debugf(ctx, "Assigning task requested resources for [%+v]", workflow)
var taskResourceAttributes = workflowengineInterfaces.TaskResources{}
if resource != nil && resource.Attributes != nil && resource.Attributes.GetTaskResourceAttributes() != nil {
taskResourceAttributes.Defaults = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Defaults)
taskResourceAttributes.Limits = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Limits)
} else {
taskResourceAttributes = workflowengineInterfaces.TaskResources{
Defaults: m.config.TaskResourceConfiguration().GetDefaults(),
Limits: m.config.TaskResourceConfiguration().GetLimits(),
}
}

return taskResourceAttributes
}

// Fetches inherited execution metadata including the parent node execution db model id and the source execution model id
// as well as sets request spec metadata with the inherited principal and adjusted nesting data.
func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, requestSpec *admin.ExecutionSpec,
Expand Down Expand Up @@ -612,7 +515,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
}

// Dynamically assign task resource defaults.
platformTaskResources := m.getTaskResources(ctx, workflow.Id)
platformTaskResources := util.GetTaskResources(ctx, workflow.Id, m.resourceManager, m.config.TaskResourceConfiguration())
for _, t := range workflow.Closure.CompiledWorkflow.Tasks {
m.setCompiledTaskDefaults(ctx, t, platformTaskResources)
}
Expand Down Expand Up @@ -863,8 +766,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

platformTaskResources := m.getTaskResources(ctx, workflow.Id)
// Dynamically assign task resource defaults.
platformTaskResources := util.GetTaskResources(ctx, workflow.Id, m.resourceManager, m.config.TaskResourceConfiguration())
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
m.setCompiledTaskDefaults(ctx, task, platformTaskResources)
}
Expand Down
200 changes: 0 additions & 200 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3725,90 +3725,6 @@ func TestListExecutions_LegacyModel(t *testing.T) {
assert.Empty(t, executionList.Token)
}

func TestGetTaskResourcesAsSet(t *testing.T) {
taskResources := getTaskResourcesAsSet(context.TODO(), &core.Identifier{}, []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "100",
},
{
Name: core.Resources_MEMORY,
Value: "200",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "300",
},
{
Name: core.Resources_GPU,
Value: "400",
},
}, "request")
assert.True(t, taskResources.CPU.Equal(resource.MustParse("100")))
assert.True(t, taskResources.Memory.Equal(resource.MustParse("200")))
assert.True(t, taskResources.EphemeralStorage.Equal(resource.MustParse("300")))
assert.True(t, taskResources.GPU.Equal(resource.MustParse("400")))
}

func TestGetCompleteTaskResourceRequirements(t *testing.T) {
taskResources := getCompleteTaskResourceRequirements(context.TODO(), &core.Identifier{}, &core.CompiledTask{
Template: &core.TaskTemplate{
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "100",
},
{
Name: core.Resources_MEMORY,
Value: "200",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "300",
},
{
Name: core.Resources_GPU,
Value: "400",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200",
},
{
Name: core.Resources_MEMORY,
Value: "400",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "600",
},
{
Name: core.Resources_GPU,
Value: "800",
},
},
},
},
},
},
})

assert.True(t, taskResources.Defaults.CPU.Equal(resource.MustParse("100")))
assert.True(t, taskResources.Defaults.Memory.Equal(resource.MustParse("200")))
assert.True(t, taskResources.Defaults.EphemeralStorage.Equal(resource.MustParse("300")))
assert.True(t, taskResources.Defaults.GPU.Equal(resource.MustParse("400")))

assert.True(t, taskResources.Limits.CPU.Equal(resource.MustParse("200")))
assert.True(t, taskResources.Limits.Memory.Equal(resource.MustParse("400")))
assert.True(t, taskResources.Limits.EphemeralStorage.Equal(resource.MustParse("600")))
assert.True(t, taskResources.Limits.GPU.Equal(resource.MustParse("800")))
}

func TestSetDefaults(t *testing.T) {
task := &core.CompiledTask{
Template: &core.TaskTemplate{
Expand Down Expand Up @@ -5379,122 +5295,6 @@ func TestResolvePermissions(t *testing.T) {
})
}

func TestGetTaskResources(t *testing.T) {
taskConfig := runtimeMocks.MockTaskResourceConfiguration{}
taskConfig.Defaults = runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("500Mi"),
Storage: resource.MustParse("400Mi"),
}
taskConfig.Limits = runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
Storage: resource.MustParse("450Mi"),
}
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
runtimeMocks.NewMockWhitelistConfiguration(), nil)

t.Run("use runtime application values", func(t *testing.T) {
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfig, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
taskResourceAttrs := execManager.(*ExecutionManager).getTaskResources(context.TODO(), &workflowIdentifier)
assert.EqualValues(t, taskResourceAttrs, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("500Mi"),
Storage: resource.MustParse("400Mi"),
},
Limits: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
Storage: resource.MustParse("450Mi"),
},
})
})
t.Run("use specific overrides", func(t *testing.T) {
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Workflow: workflowIdentifier.Name,
ResourceType: admin.MatchableResource_TASK_RESOURCE,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_TaskResourceAttributes{
TaskResourceAttributes: &admin.TaskResourceAttributes{
Defaults: &admin.TaskResourceSpec{
Cpu: "1200m",
Gpu: "18",
Memory: "1200Gi",
EphemeralStorage: "1500Mi",
Storage: "1400Mi",
},
Limits: &admin.TaskResourceSpec{
Cpu: "300m",
Gpu: "8",
Memory: "500Gi",
EphemeralStorage: "501Mi",
Storage: "450Mi",
},
},
},
},
}, nil
}
executionManager := ExecutionManager{
resourceManager: &resourceManager,
config: mockConfig,
}
taskResourceAttrs := executionManager.getTaskResources(context.TODO(), &workflowIdentifier)
assert.EqualValues(t, taskResourceAttrs, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("1200m"),
GPU: resource.MustParse("18"),
Memory: resource.MustParse("1200Gi"),
EphemeralStorage: resource.MustParse("1500Mi"),
Storage: resource.MustParse("1400Mi"),
},
Limits: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
Storage: resource.MustParse("450Mi"),
},
})
})
}

func TestFromAdminProtoTaskResourceSpec(t *testing.T) {
taskResourceSet := fromAdminProtoTaskResourceSpec(context.TODO(), &admin.TaskResourceSpec{
Cpu: "1",
Memory: "100",
Storage: "200",
EphemeralStorage: "300",
Gpu: "2",
})
assert.EqualValues(t, runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("1"),
Memory: resource.MustParse("100"),
Storage: resource.MustParse("200"),
EphemeralStorage: resource.MustParse("300"),
GPU: resource.MustParse("2"),
}, taskResourceSet)
}

func TestAddStateFilter(t *testing.T) {
t.Run("empty filters", func(t *testing.T) {
var filters []common.InlineFilter
Expand Down
Loading

0 comments on commit 4b712bd

Please sign in to comment.