Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fix exponent overflow #100

Merged
merged 8 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pkg/controller/nodes/task/backoff/atomic_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package backoff

import (
"sync/atomic"
"time"
)

// AtomicTime represents an atomic.Value that stores time.Time.
type AtomicTime struct {
v atomic.Value
}

// Loads the underlying time.Time.
func (a *AtomicTime) Load() time.Time {
return a.v.Load().(time.Time)
}

// Stores time.Time to the underlying atomic.Value
func (a *AtomicTime) Store(t time.Time) {
a.v.Store(t)
}

// Creates a new Atomic time.Time
func NewAtomicTime(t time.Time) AtomicTime {
v := atomic.Value{}
v.Store(t)

return AtomicTime{
v: v,
}
}
27 changes: 6 additions & 21 deletions pkg/controller/nodes/task/backoff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

stdAtomic "github.com/lyft/flytestdlib/atomic"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
Expand All @@ -24,8 +26,8 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: m.Clock,
BackOffBaseSecond: backOffBaseSecond,
BackOffExponent: 0,
NextEligibleTime: m.Clock.Now(),
BackOffExponent: stdAtomic.NewUint32(0),
NextEligibleTime: NewAtomicTime(m.Clock.Now()),
MaxBackOffDuration: maxBackOffDuration,
}, ComputeResourceCeilings: &ComputeResourceCeilings{
computeResourceCeilings: v1.ResourceList{},
Expand All @@ -37,35 +39,18 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff
} else {
logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key)
}

if ret, casted := h.(*ComputeResourceAwareBackOffHandler); casted {
return ret
}

return nil
}

func (m *Controller) GetBackOffHandler(key string) (*ComputeResourceAwareBackOffHandler, bool) {
return m.backOffHandlerMap.Get(key)
}

func (m *Controller) CreateBackOffHandler(ctx context.Context, key string, backOffBaseSecond int, maxBackOffDuration time.Duration) *ComputeResourceAwareBackOffHandler {
m.backOffHandlerMap.Set(key, &ComputeResourceAwareBackOffHandler{
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: m.Clock,
BackOffBaseSecond: backOffBaseSecond,
BackOffExponent: 0,
NextEligibleTime: m.Clock.Now(),
MaxBackOffDuration: maxBackOffDuration,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
computeResourceCeilings: v1.ResourceList{},
},
})
h, _ := m.backOffHandlerMap.Get(key)
h.reset()
logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key)
return h
}

func ComposeResourceKey(o k8s.Resource) string {
return fmt.Sprintf("%v,%v", o.GroupVersionKind().String(), o.GetNamespace())
}
Expand Down
30 changes: 19 additions & 11 deletions pkg/controller/nodes/task/backoff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/lyft/flyteplugins/go/tasks/errors"
stdAtomic "github.com/lyft/flytestdlib/atomic"
stdErrors "github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
Expand All @@ -24,33 +25,40 @@ var (
type SimpleBackOffBlocker struct {
Clock clock.Clock
BackOffBaseSecond int
BackOffExponent int
NextEligibleTime time.Time
MaxBackOffDuration time.Duration

// Mutable fields
BackOffExponent stdAtomic.Uint32
NextEligibleTime AtomicTime
}

func (b *SimpleBackOffBlocker) isBlocking(t time.Time) bool {
return !b.NextEligibleTime.Before(t)
return !b.NextEligibleTime.Load().Before(t)
}

func (b *SimpleBackOffBlocker) getBlockExpirationTime() time.Time {
return b.NextEligibleTime
return b.NextEligibleTime.Load()
}

func (b *SimpleBackOffBlocker) reset() {
b.BackOffExponent = 0
b.NextEligibleTime = b.Clock.Now()
b.BackOffExponent.Store(0)
b.NextEligibleTime.Store(b.Clock.Now())
}

func (b *SimpleBackOffBlocker) backOff() time.Duration {
backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond), float64(b.BackOffExponent))))
func (b *SimpleBackOffBlocker) backOff(ctx context.Context) time.Duration {
logger.Debug(ctx, "BackOff params [BackOffBaseSecond: %v] [BackOffExponent: %v] [MaxBackOffDuration: %v]",
b.BackOffBaseSecond, b.BackOffExponent, b.MaxBackOffDuration)

backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond),
float64(b.BackOffExponent.Load()))))

if backOffDuration > b.MaxBackOffDuration {
backOffDuration = b.MaxBackOffDuration
} else {
b.BackOffExponent.Inc()
}

b.NextEligibleTime = b.Clock.Now().Add(backOffDuration)
b.BackOffExponent++
b.NextEligibleTime.Store(b.Clock.Now().Add(backOffDuration))
return backOffDuration
}

Expand Down Expand Up @@ -142,7 +150,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati
// if the backOffBlocker is not blocking and we are still encountering insufficient resource issue,
// we should increase the exponent in the backoff and update the NextEligibleTime

backOffDuration := h.SimpleBackOffBlocker.backOff()
backOffDuration := h.SimpleBackOffBlocker.backOff(ctx)
logger.Infof(ctx, "The operation was attempted because the back-off handler is not blocking, but failed due to "+
"insufficient resource (backing off for a duration of [%v] to timestamp [%v])\n", backOffDuration, h.SimpleBackOffBlocker.NextEligibleTime)
} else {
Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/nodes/task/backoff/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"testing"
"time"

stdAtomic "github.com/lyft/flytestdlib/atomic"

"github.com/magiconair/properties/assert"

taskErrors "github.com/lyft/flyteplugins/go/tasks/errors"
stdlibErrors "github.com/lyft/flytestdlib/errors"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,7 +52,7 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
args args
wantErr bool
wantErrCode stdlibErrors.ErrorCode
wantExp int
wantExp uint32
wantNextEligibleTime time.Time
wantCeilings v1.ResourceList
wantCallCount int
Expand All @@ -59,8 +63,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * 7),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand All @@ -84,8 +88,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * 7),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand All @@ -111,8 +115,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * -2),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * -2)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand Down Expand Up @@ -147,10 +151,10 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
t.Errorf("Handle() errorCode = %v, wantErrCode %v", ec, tt.wantErrCode)
}
}
if tt.wantExp != h.BackOffExponent {
if tt.wantExp != h.BackOffExponent.Load() {
t.Errorf("post-Handle() BackOffExponent = %v, wantBackOffExponent %v", h.BackOffExponent, tt.wantExp)
}
if tt.wantNextEligibleTime != h.NextEligibleTime {
if tt.wantNextEligibleTime != h.NextEligibleTime.Load() {
t.Errorf("post-Handle() NextEligibleTime = %v, wantNextEligibleTime %v", h.NextEligibleTime, tt.wantNextEligibleTime)
}
if !reflect.DeepEqual(h.computeResourceCeilings, tt.wantCeilings) {
Expand Down Expand Up @@ -428,14 +432,14 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
type fields struct {
Clock clock.Clock
BackOffBaseSecond int
BackOffExponent int
BackOffExponent uint32
NextEligibleTime time.Time
MaxBackOffDuration time.Duration
}
tests := []struct {
name string
fields fields
wantExponent int
wantExponent uint32
wantDuration time.Duration
}{
{name: "backoff should increase exponent",
Expand All @@ -450,7 +454,7 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
},
{name: "backoff should saturate",
fields: fields{Clock: tc, BackOffBaseSecond: 2, BackOffExponent: 10, NextEligibleTime: tc.Now(), MaxBackOffDuration: maxBackOffDuration},
wantExponent: 11,
wantExponent: 10,
wantDuration: maxBackOffDuration,
},
}
Expand All @@ -459,17 +463,33 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
b := &SimpleBackOffBlocker{
Clock: tt.fields.Clock,
BackOffBaseSecond: tt.fields.BackOffBaseSecond,
BackOffExponent: tt.fields.BackOffExponent,
NextEligibleTime: tt.fields.NextEligibleTime,
BackOffExponent: stdAtomic.NewUint32(tt.fields.BackOffExponent),
NextEligibleTime: NewAtomicTime(tt.fields.NextEligibleTime),
MaxBackOffDuration: tt.fields.MaxBackOffDuration,
}

if got := b.backOff(); !reflect.DeepEqual(got, tt.wantDuration) {
if got := b.backOff(context.Background()); !reflect.DeepEqual(got, tt.wantDuration) {
t.Errorf("backOff() = %v, want %v", got, tt.wantDuration)
}
if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp, tt.wantExponent) {
t.Errorf("backOffExponent = %v, want %v", gotExp, tt.wantExponent)
if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp.Load(), tt.wantExponent) {
t.Errorf("backOffExponent = %v, want %v", gotExp.Load(), tt.wantExponent)
}
})
}

t.Run("backoff many times after maxBackOffDuration is hit", func(t *testing.T) {
b := &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: stdAtomic.NewUint32(10),
NextEligibleTime: NewAtomicTime(tc.Now()),
MaxBackOffDuration: maxBackOffDuration,
}

for i := 0; i < 10; i++ {
backOffDuration := b.backOff(context.Background())
assert.Equal(t, maxBackOffDuration, backOffDuration)
assert.Equal(t, uint32(10), b.BackOffExponent.Load())
}
})
}
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
refKey := backoff.ComposeResourceKey(referenceResource)
podBackOffHandler, found := backOffController.GetBackOffHandler(refKey)
assert.True(t, found)
assert.Equal(t, 1, podBackOffHandler.BackOffExponent)
assert.Equal(t, uint32(1), podBackOffHandler.BackOffExponent.Load())
})
}

Expand Down