Skip to content

Commit

Permalink
Fast fail if task resource requests exceed k8s resource limits (flyte…
Browse files Browse the repository at this point in the history
…org#488)

* checking if task resource requests exceed k8s limits

Signed-off-by: Daniel Rammer <[email protected]>

* added better message to task failure

Signed-off-by: Daniel Rammer <[email protected]>

* added request checks

Signed-off-by: Daniel Rammer <[email protected]>

* added tests for checking resource eligibility

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated comment

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored May 5, 2023
1 parent 4faaa32 commit ca038b5
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 29 deletions.
41 changes: 30 additions & 11 deletions flytepropeller/pkg/controller/nodes/task/backoff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/flyteorg/flyteplugins/go/tasks/errors"
stdAtomic "github.com/flyteorg/flytestdlib/atomic"
stdErrors "github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -18,7 +17,10 @@ import (
)

var (
reqRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
limitedLimitsRegexp = regexp.MustCompile(`limited: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
limitedRequestsRegexp = regexp.MustCompile(`limited: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
requestedLimitsRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
requestedRequestsRegexp = regexp.MustCompile(`requested: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
)

// SimpleBackOffBlocker is a simple exponential back-off timer that keeps track of the back-off period
Expand Down Expand Up @@ -171,7 +173,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati
// It is necessary to parse the error message to get the actual constraints
// in this case, if the error message indicates constraints on memory only, then we shouldn't be used to lower the CPU ceiling
// even if CPU appears in requestedResourceList
newCeiling := GetComputeResourceAndQuantityRequested(err)
newCeiling := GetComputeResourceAndQuantity(err, requestedLimitsRegexp)
h.ComputeResourceCeilings.updateAll(&newCeiling)
}

Expand All @@ -196,18 +198,18 @@ func IsBackOffError(err error) bool {
return IsResourceQuotaExceeded(err) || apiErrors.IsTooManyRequests(err) || apiErrors.IsServerTimeout(err)
}

func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList {
func GetComputeResourceAndQuantity(err error, resourceRegex *regexp.Regexp) v1.ResourceList {
// Playground: https://play.golang.org/p/oOr6CMmW7IE

// Sample message:
// "requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi"

// Extracting "requested: limits.cpu=7,limits.memory=64Gi"
matches := reqRegexp.FindAllStringSubmatch(err.Error(), -1)
requestedComputeResources := v1.ResourceList{}
matches := resourceRegex.FindAllStringSubmatch(err.Error(), -1)
computeResources := v1.ResourceList{}

if len(matches) == 0 || len(matches[0]) == 0 {
return requestedComputeResources
return computeResources
}

// Extracting "limits.cpu=7,limits.memory=64Gi"
Expand All @@ -226,11 +228,28 @@ func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList {
if len(tuple) < 2 {
continue
}
requestedComputeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1])
computeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1])
}
return requestedComputeResources
return computeResources
}

func IsBackoffError(err error) bool {
return stdErrors.IsCausedBy(err, errors.BackOffError)
func IsResourceRequestsEligible(err error) bool {
limitedLimitsResourceList := GetComputeResourceAndQuantity(err, limitedLimitsRegexp)
limitedRequestsResourceList := GetComputeResourceAndQuantity(err, limitedRequestsRegexp)
requestedLimitsResourceList := GetComputeResourceAndQuantity(err, requestedLimitsRegexp)
requestedRequestsResourceList := GetComputeResourceAndQuantity(err, requestedRequestsRegexp)

return isEligible(requestedLimitsResourceList, limitedLimitsResourceList) &&
isEligible(requestedRequestsResourceList, limitedRequestsResourceList)
}

func isEligible(requestedResourceList, quotaResourceList v1.ResourceList) (eligibility bool) {
for resource, requestedQuantity := range requestedResourceList {
quotaQuantity, exists := quotaResourceList[resource]
if exists && requestedQuantity.Cmp(quotaQuantity) >= 0 {
return false
}
}

return true
}
125 changes: 113 additions & 12 deletions flytepropeller/pkg/controller/nodes/task/backoff/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"reflect"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -344,32 +345,79 @@ func TestComputeResourceCeilings_updateAll(t *testing.T) {

func TestGetComputeResourceAndQuantityRequested(t *testing.T) {
type args struct {
err error
err error
regexp *regexp.Regexp
}
tests := []struct {
name string
args args
want v1.ResourceList
}{
{name: "Memory request", args: args{err: apiErrors.NewForbidden(
{name: "Limited memory limits", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.memory=3Gi, "+
"used: limits.memory=7976Gi, limited: limits.memory=8000Gi"))},
"exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")),
regexp: limitedLimitsRegexp},
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}},
{name: "Limited CPU limits", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")),
regexp: limitedLimitsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}},
{name: "Limited multiple limits ", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")),
regexp: limitedLimitsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}},
{name: "Limited memory requests", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")),
regexp: limitedRequestsRegexp},
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}},
{name: "Limited CPU requests", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")),
regexp: limitedRequestsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}},
{name: "Limited multiple requests ", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")),
regexp: limitedRequestsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}},
{name: "Requested memory limits", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")),
regexp: requestedLimitsRegexp},
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}},
{name: "Requested CPU limits", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")),
regexp: requestedLimitsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}},
{name: "Requested multiple limits ", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")),
regexp: requestedLimitsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}},
{name: "Requested memory requests", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")),
regexp: requestedRequestsRegexp},
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}},
{name: "CPU request", args: args{err: apiErrors.NewForbidden(
{name: "Requested CPU requests", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=3640m, "+
"used: limits.cpu=6000m, limited: limits.cpu=8000m"))},
"exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")),
regexp: requestedRequestsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}},
{name: "Multiple resources ", args: args{err: apiErrors.NewForbidden(
{name: "Requested multiple requests ", args: args{err: apiErrors.NewForbidden(
schema.GroupResource{}, "", errors.New("is forbidden: "+
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi"))},
"exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")),
regexp: requestedRequestsRegexp},
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetComputeResourceAndQuantityRequested(tt.args.err); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetComputeResourceAndQuantityRequested() = %v, want %v", got, tt.want)
if got := GetComputeResourceAndQuantity(tt.args.err, tt.args.regexp); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetComputeResourceAndQuantity() = %v, want %v", got, tt.want)
}
})
}
Expand All @@ -390,7 +438,7 @@ func TestIsBackoffError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsBackoffError(tt.args.err); got != tt.want {
if got := stdlibErrors.IsCausedBy(tt.args.err, taskErrors.BackOffError); got != tt.want {
t.Errorf("IsBackoffError() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -516,3 +564,56 @@ func TestErrorTypes(t *testing.T) {
assert.True(t, res)
})
}

func TestIsEligible(t *testing.T) {
type args struct {
requested v1.ResourceList
quota v1.ResourceList
}
tests := []struct {
name string
args args
want bool
}{
{
name: "CPUElgible",
args: args{
requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")},
},
want: true,
},
{
name: "CPUInelgible",
args: args{
requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m")},
},
want: false,
},
{
name: "MemoryElgible",
args: args{
requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("32Gi")},
quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
},
want: true,
},
{
name: "MemoryInelgible",
args: args{
requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isEligible(tt.args.requested, tt.args.quota); got != tt.want {
t.Errorf("isEligible() = %v, want %v", got, tt.want)
}
})
}
}
16 changes: 10 additions & 6 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -224,14 +223,19 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas
}

if err != nil && !k8serrors.IsAlreadyExists(err) {
if backoff.IsBackoffError(err) {
if backoff.IsResourceQuotaExceeded(err) && !backoff.IsResourceRequestsEligible(err) {
// if task resources exceed resource quotas then permanently fail because the task will
// be stuck waiting for resources until the `node-active-deadline` terminates the node.
logger.Errorf(ctx, "task resource requests exceed k8s resource limits. err: %v", err)
return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("ResourceRequestsExceedLimits",
fmt.Sprintf("requested resources exceed limits: %v", err.Error()), nil)), nil
} else if stdErrors.IsCausedBy(err, errors.BackOffError) {
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded. err: %v", err)
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
} else if e.backOffController == nil && backoff.IsResourceQuotaExceeded(err) {
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err)
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
} else if k8serrors.IsForbidden(err) {
if e.backOffController == nil && strings.Contains(err.Error(), "exceeded quota") {
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err)
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
}
return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure("RuntimeFailure", err.Error(), nil)), nil
} else if k8serrors.IsBadRequest(err) || k8serrors.IsInvalid(err) {
logger.Errorf(ctx, "Badly formatted resource for plugin [%s], err %s", e.id, err)
Expand Down

0 comments on commit ca038b5

Please sign in to comment.