Skip to content

Commit

Permalink
Use quantity rather than string in task resource config (flyteorg#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Sep 2, 2021
1 parent 79ef0dc commit 52838b2
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 188 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.19.22
github.com/flyteorg/flyteplugins v0.5.69
github.com/flyteorg/flytepropeller v0.13.18-0.20210826203304-d26afb434f09
github.com/flyteorg/flyteplugins v0.5.72
github.com/flyteorg/flytepropeller v0.13.20
github.com/flyteorg/flytestdlib v0.3.34
github.com/ghodss/yaml v1.0.0
github.com/gofrs/uuid v4.0.0+incompatible // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ=
github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.69 h1:i1V1n+uoI5TrBG/UWD6vzJ/fFAtru9FSYbjCnYBttUc=
github.com/flyteorg/flyteplugins v0.5.69/go.mod h1:YjahYP+i4/Qn+dFvxMOGbhDtkQT4EiH4Kb88KNK505A=
github.com/flyteorg/flytepropeller v0.13.18-0.20210826203304-d26afb434f09 h1:jpPDC4iyKLZIHyZ8IqvGj0Fc4L+J9DKfUv0HtGJp27c=
github.com/flyteorg/flytepropeller v0.13.18-0.20210826203304-d26afb434f09/go.mod h1:rexeH993JAGVVHFuidTL7+ANXIwhX0cdhgoitTwLGxI=
github.com/flyteorg/flyteplugins v0.5.72 h1:mtbpn4nuFrhZ2DadUlLxjK2RQ9FBFqDf0LaXVpe2hyY=
github.com/flyteorg/flyteplugins v0.5.72/go.mod h1:YjahYP+i4/Qn+dFvxMOGbhDtkQT4EiH4Kb88KNK505A=
github.com/flyteorg/flytepropeller v0.13.20 h1:nxQs6eVxloZI7g+YWOFUP9Ro00XNyR5dB+xt+xc04oc=
github.com/flyteorg/flytepropeller v0.13.20/go.mod h1:WNIZ2xrvzDjK6rQdcIdoVdjOWa0KL9zhSani72OV21M=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/flyteorg/flytestdlib v0.3.34 h1:OOuV03X8c1AWInzBU6IRsqpEF6y8WDJngbPcdL4VktY=
Expand Down
124 changes: 84 additions & 40 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,26 +235,46 @@ func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
taskResourceLimits := runtimeInterfaces.TaskResourceSet{}

// For resource values, we prefer to use the limits set in the application config over the set resource values.
if len(configResourceLimits.CPU) > 0 {
cpuLimit = configResourceLimits.CPU
} else if cpuIndex >= 0 {
cpuLimit = resourceRequestEntries[cpuIndex].Value
}
taskResourceLimits.CPU = cpuLimit
if len(configResourceLimits.Memory) > 0 {
memoryLimit = configResourceLimits.Memory
} else if memoryIndex >= 0 {
memoryLimit = resourceRequestEntries[memoryIndex].Value
}
taskResourceLimits.Memory = memoryLimit
if len(taskResourceLimits.GPU) == 0 && len(configResourceLimits.GPU) > 0 {
if !configResourceLimits.CPU.IsZero() {
taskResourceLimits.CPU = configResourceLimits.CPU
} else {
if cpuIndex >= 0 {
cpuLimit = resourceRequestEntries[cpuIndex].Value
}
cpu, err := resource.ParseQuantity(cpuLimit)
if err != nil {
logger.Errorf(ctx, "Failed to parse user cpu limit from task spec [%s] with err [%+v]", cpuLimit, err)
} else {
taskResourceLimits.CPU = cpu
}
}
if !configResourceLimits.Memory.IsZero() {
taskResourceLimits.Memory = configResourceLimits.Memory
} else {
if memoryIndex >= 0 {
memoryLimit = resourceRequestEntries[memoryIndex].Value
}
memory, err := resource.ParseQuantity(memoryLimit)
if err != nil {
logger.Errorf(ctx, "Failed to parse user memory limit from task spec [%s] with err [%+v]", memoryLimit, err)
} else {
taskResourceLimits.Memory = memory
}
}
if !taskResourceLimits.GPU.IsZero() && !configResourceLimits.GPU.IsZero() {
// When a platform default for GPU exists, but one isn't set in the task resources, use the platform value.
taskResourceLimits.GPU = configResourceLimits.GPU
}
if len(configResourceLimits.EphemeralStorage) > 0 {
if !configResourceLimits.EphemeralStorage.IsZero() {
taskResourceLimits.EphemeralStorage = configResourceLimits.EphemeralStorage
} else if ephemeralStorageIndex >= 0 {
taskResourceLimits.EphemeralStorage = resourceRequestEntries[ephemeralStorageIndex].Value
ephemeralStorage, err := resource.ParseQuantity(resourceRequestEntries[ephemeralStorageIndex].Value)
if err != nil {
logger.Errorf(ctx, "Failed to parse user ephemeral storage limit from task spec [%s] with err [%+v]",
resourceRequestEntries[ephemeralStorageIndex].Value, err)
} else {
taskResourceLimits.EphemeralStorage = ephemeralStorage
}
}

return taskResourceLimits
Expand All @@ -279,9 +299,9 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
return resourceEntries
}

if cpuIndex < 0 && len(platformValues.CPU) > 0 {
if cpuIndex < 0 && !platformValues.CPU.IsZero() {
logger.Debugf(ctx, "Setting 'cpu' for [%+v] to %s", identifier, platformValues.CPU)
cpuValue := platformValues.CPU
cpuValue := platformValues.CPU.String()
if taskResourceSpec != nil && len(taskResourceSpec.Cpu) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
cpuValue = taskResourceSpec.Cpu
Expand All @@ -292,8 +312,8 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
}
resourceEntries = append(resourceEntries, cpuResource)
}
if memoryIndex < 0 && len(platformValues.Memory) > 0 {
memoryValue := platformValues.Memory
if memoryIndex < 0 && !platformValues.Memory.IsZero() {
memoryValue := platformValues.Memory.String()
if taskResourceSpec != nil && len(taskResourceSpec.Memory) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
memoryValue = taskResourceSpec.Memory
Expand All @@ -310,8 +330,8 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
if taskResourceSpec != nil && len(taskResourceSpec.EphemeralStorage) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
ephemeralStorageValue = taskResourceSpec.EphemeralStorage
} else if len(platformValues.EphemeralStorage) > 0 {
ephemeralStorageValue = platformValues.EphemeralStorage
} else if !platformValues.EphemeralStorage.IsZero() {
ephemeralStorageValue = platformValues.EphemeralStorage.String()
}
if len(ephemeralStorageValue) > 0 {
ephemeralStorageResource := &core.Resources_ResourceEntry{
Expand Down Expand Up @@ -416,7 +436,43 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
checkTaskRequestsLessThanLimits(ctx, task.Template.Id, task.Template.GetContainer().Resources)
}

func (m *ExecutionManager) getTaskResources(ctx context.Context, workflow *core.Identifier) *admin.TaskResourceAttributes {
func fromAdminProtoTaskResourceSpec(ctx context.Context, spec *admin.TaskResourceSpec) runtimeInterfaces.TaskResourceSet {
result := runtimeInterfaces.TaskResourceSet{}
var err error
if len(spec.Cpu) > 0 {
result.CPU, err = resource.ParseQuantity(spec.Cpu)
if err != nil {
logger.Infof(ctx, "Failed to parse cpu [%s] from task resource spec with err: %v", spec.Cpu, err)
}
}
if len(spec.Memory) > 0 {
result.Memory, err = resource.ParseQuantity(spec.Memory)
if err != nil {
logger.Infof(ctx, "Failed to parse memory [%s] from task resource spec with err: %v", spec.Memory, err)
}
}
if len(spec.Storage) > 0 {
result.Storage, err = resource.ParseQuantity(spec.Storage)
if err != nil {
logger.Infof(ctx, "Failed to parse storage [%s] from task resource spec with err: %v", spec.Storage, err)
}
}
if len(spec.EphemeralStorage) > 0 {
result.EphemeralStorage, err = resource.ParseQuantity(spec.EphemeralStorage)
if err != nil {
logger.Infof(ctx, "Failed to parse ephemeral storage [%s] from task resource spec with err: %v", spec.EphemeralStorage, err)
}
}
if len(spec.Gpu) > 0 {
result.GPU, err = resource.ParseQuantity(spec.Gpu)
if err != nil {
logger.Infof(ctx, "Failed to parse gpu [%s] from task resource spec with err: %v", spec.Gpu, err)
}
}
return result
}

func (m *ExecutionManager) getTaskResources(ctx context.Context, workflow *core.Identifier) *workflowengineInterfaces.TaskResources {
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: workflow.Project,
Domain: workflow.Domain,
Expand All @@ -429,26 +485,14 @@ func (m *ExecutionManager) getTaskResources(ctx context.Context, workflow *core.
workflow, err)
}
logger.Debugf(ctx, "Assigning task requested resources for [%+v]", workflow)
var taskResourceAttributes = &admin.TaskResourceAttributes{}
var taskResourceAttributes = &workflowengineInterfaces.TaskResources{}
if resource != nil && resource.Attributes != nil && resource.Attributes.GetTaskResourceAttributes() != nil {
taskResourceAttributes.Defaults = resource.Attributes.GetTaskResourceAttributes().Defaults
taskResourceAttributes.Limits = resource.Attributes.GetTaskResourceAttributes().Limits
taskResourceAttributes.Defaults = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Defaults)
taskResourceAttributes.Limits = fromAdminProtoTaskResourceSpec(ctx, resource.Attributes.GetTaskResourceAttributes().Limits)
} else {
taskResourceAttributes = &admin.TaskResourceAttributes{
Defaults: &admin.TaskResourceSpec{
Cpu: m.config.TaskResourceConfiguration().GetDefaults().CPU,
Memory: m.config.TaskResourceConfiguration().GetDefaults().Memory,
EphemeralStorage: m.config.TaskResourceConfiguration().GetDefaults().EphemeralStorage,
Storage: m.config.TaskResourceConfiguration().GetDefaults().Storage,
Gpu: m.config.TaskResourceConfiguration().GetDefaults().GPU,
},
Limits: &admin.TaskResourceSpec{
Cpu: m.config.TaskResourceConfiguration().GetLimits().CPU,
Memory: m.config.TaskResourceConfiguration().GetLimits().Memory,
EphemeralStorage: m.config.TaskResourceConfiguration().GetLimits().EphemeralStorage,
Storage: m.config.TaskResourceConfiguration().GetLimits().Storage,
Gpu: m.config.TaskResourceConfiguration().GetLimits().GPU,
},
taskResourceAttributes = &workflowengineInterfaces.TaskResources{
Defaults: m.config.TaskResourceConfiguration().GetDefaults(),
Limits: m.config.TaskResourceConfiguration().GetLimits(),
}
}
return taskResourceAttributes
Expand Down
Loading

0 comments on commit 52838b2

Please sign in to comment.