Skip to content

Commit

Permalink
fix race condition between ca and scheduler
Browse files Browse the repository at this point in the history
Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between ca and apiserver
  • Loading branch information
damikag committed Sep 4, 2023
1 parent 893a51b commit 10577b4
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 28 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,7 @@ type AutoscalingOptions struct {
ParallelDrain bool
// NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions.
NodeGroupSetRatios NodeGroupDifferenceRatios
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
}
47 changes: 35 additions & 12 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Actuator struct {
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
}

Expand Down Expand Up @@ -158,20 +160,41 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
var updateLatencyTracker *UpdateLatencyTracker
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister(), time.Now)
go updateLatencyTracker.Start()
}
for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
if ok {
// CA is expected to wait 3 times the round-trip time between CA and the api-server.
// Therefore, the nodeDeleteDelayAfterTaint is set 2 times the latency.
// A delay of one round trip time is implicitly there when measuring the latency.
a.nodeDeleteDelayAfterTaint = 2 * latency
}
}
return nil
}

Expand Down Expand Up @@ -207,9 +230,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
return
}

if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
if a.nodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint)
time.Sleep(a.nodeDeleteDelayAfterTaint)
}

clusterSnapshot, err := a.createSnapshot(nodes)
Expand Down
116 changes: 116 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

const sleepDurationWhenPolling = 50 * time.Millisecond
const waitForTaintingTimeoutDuration = 30 * time.Second

type NodeTaintStartTime struct {
nodeName string
startTime time.Time
}

// UpdateLatencyTracker can be used to calculate round-trip time between CA and api-server
// when adding ToBeDeletedTaint to nodes
type UpdateLatencyTracker struct {
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
remainingNodeCount int
nodeLister kubernetes.NodeLister
StartTimeChan chan NodeTaintStartTime
// Passing a bool will wait for all the started nodes to get tainted and calculate
// latency based on latencies observed. (If all the nodes did not get tained within
// waitForTaintingTimeoutDuration after passing a bool, latency calculation will be
// aborted and the ResultChan will be closed without returning a value) Closing the
// AwaitOrStopChan without passing any bool will abort the latency calculation.
AwaitOrStopChan chan bool
ResultChan chan time.Duration
// now is used only to make the testing easier
now func() time.Time
}

func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister, now func() time.Time) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: map[string]time.Time{},
finishTimestamp: map[string]time.Time{},
remainingNodeCount: 0,
nodeLister: nodeLister,
StartTimeChan: make(chan NodeTaintStartTime),
AwaitOrStopChan: make(chan bool),
ResultChan: make(chan time.Duration),
now: now,
}
}

func (u *UpdateLatencyTracker) Start() {
for {
select {
case _, ok := <-u.AwaitOrStopChan:
if ok {
u.await()
}
return
case ntst := <-u.StartTimeChan:
u.startTimestamp[ntst.nodeName] = ntst.startTime
u.remainingNodeCount += 1
continue
default:
}
u.updateFinishTime()
time.Sleep(sleepDurationWhenPolling)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
for nodeName, _ := range u.startTimestamp {
if _, ok := u.finishTimestamp[nodeName]; ok {
continue
}
node, err := u.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("Error getting node: %v", err)
continue
}
if taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = u.now()
u.remainingNodeCount -= 1
}
}
}

func (u *UpdateLatencyTracker) calculateLatency() time.Duration {
var maxLatency time.Duration = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
currentLatency := endTime.Sub(startTime)
if currentLatency > maxLatency {
maxLatency = currentLatency
}
}
return maxLatency
}

func (u *UpdateLatencyTracker) await() {
waitingForTaintingStartTime := time.Now()
for {
switch {
case u.remainingNodeCount == 0:
latency := u.calculateLatency()
u.ResultChan <- latency
return
case time.Now().After(waitingForTaintingStartTime.Add(waitForTaintingTimeoutDuration)):
klog.Errorf("Timeout before tainting all nodes, latency measurement will be stale")
close(u.ResultChan)
return
default:
time.Sleep(sleepDurationWhenPolling)
u.updateFinishTime()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package actuation

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

// mockClock is used to mock time.Now() when testing UpdateLatencyTracker
// For the n th call to Now() it will return a timestamp after duration[n] to
// the startTime if n < the length of durations. Otherwise, it will return current time.
type mockClock struct {
startTime time.Time
durations []time.Duration
index int
mutex sync.Mutex
}

func NewMockClock(startTime time.Time, durations []time.Duration) mockClock {
return mockClock{
startTime: startTime,
durations: durations,
index: 0,
}
}

func (m *mockClock) Now() time.Time {
m.mutex.Lock()
defer m.mutex.Unlock()
var timeToSend time.Time
if m.index < len(m.durations) {
timeToSend = m.startTime.Add(m.durations[m.index])
} else {
timeToSend = time.Now()
}
m.index += 1
return timeToSend
}

func (m *mockClock) getIndex() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.index
}

func TestUpdateLatencyCalculation(t *testing.T) {
toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule}
n1 := test.BuildTestNode("n1", 100, 100)
n1.Spec.Taints = append(n1.Spec.Taints, toBeDeletedTaint)
n2 := test.BuildTestNode("n2", 100, 100)
n2.Spec.Taints = append(n2.Spec.Taints, toBeDeletedTaint)
n3 := test.BuildTestNode("n3", 100, 100)

testCases := []struct {
description string
startTime time.Time
nodes []*apiv1.Node
durations []time.Duration
wantLatency time.Duration
wantResultChanNotClosed bool
}{
{
description: "latency when tainting a single node",
startTime: time.Now(),
nodes: []*apiv1.Node{n1},
durations: []time.Duration{100 * time.Millisecond},
wantLatency: 100 * time.Millisecond,
wantResultChanNotClosed: true,
},
{
description: "latency when tainting multiple nodes",
startTime: time.Now(),
nodes: []*apiv1.Node{n1, n2},
durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond},
wantLatency: 150 * time.Millisecond,
wantResultChanNotClosed: true,
},
{
description: "Some nodes fails to taint before timeout",
startTime: time.Now(),
nodes: []*apiv1.Node{n1, n3},
durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond},
wantResultChanNotClosed: false,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
assert.Equal(t, len(tc.nodes), len(tc.durations))
mc := NewMockClock(tc.startTime, tc.durations)
nodeLister := kubernetes.NewTestNodeLister(tc.nodes)
updateLatencyTracker := NewUpdateLatencyTracker(nodeLister, mc.Now)
go updateLatencyTracker.Start()
for _, node := range tc.nodes {
updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, tc.startTime}
}
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
assert.Equal(t, tc.wantResultChanNotClosed, ok)
if ok {
assert.Equal(t, tc.wantLatency, latency)
assert.Equal(t, len(tc.durations), mc.getIndex())
}
})
}
}
34 changes: 18 additions & 16 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,23 @@ var (
"maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.")
nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour,
"nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.")
maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.")
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers")
minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it")
scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.")
parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.")
maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.")
maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.")
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.")
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers")
minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it")
scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.")
parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.")
maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.")
maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.")
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", true, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -379,6 +380,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxAllocatableDifferenceRatio: *maxAllocatableDifferenceRatio,
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
}
}

Expand Down

0 comments on commit 10577b4

Please sign in to comment.