Skip to content

Commit

Permalink
Allow delayed downscale of subset of pods (#156)
Browse files Browse the repository at this point in the history
* When checking downscale delay in the statefulset allow downscale if some pods at the end of statefulset are ready to be downscaled.

* CHANGELOG.md
  • Loading branch information
pstibrany authored Jun 17, 2024
1 parent 6b427ca commit 5dce3cc
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [ENHANCEMENT] Include unique IDs of webhook requests in logs for easier debugging. #150
* [ENHANCEMENT] Include k8s operation username in request debug logs. #152
* [ENHANCEMENT] `rollout-max-unavailable` annotation can now be specified as percentage, e.g.: `rollout-max-unavailable: 25%`. Resulting value is computed as `floor(replicas * percentage)`, but is never less than 1. #153
* [ENHANCEMENT] Delayed downscale of statefulset can now reduce replicas earlier, if subset of pods at the end of statefulset have already reached their delay. #156
* [BUGFIX] Fix a mangled error log in controller's delayed downscale code. #154

## v0.16.0
Expand Down
30 changes: 27 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,15 +655,15 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)
},
},

"scale down is not allowed if delay time was not reached on one pod": {
"scale down is not allowed if delay time was not reached on one pod at the end of statefulset": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-b", withReplicas(5, 5),
withMirrorReplicasAnnotations("test", customResourceGVK),
withDelayedDownscaleAnnotations(time.Hour, "http://pod/prepare-delayed-downscale")),
},
httpResponses: map[string]httpResponse{
"POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())},
"POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Unix())},
"POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Unix())},
"POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())},
},
customResourceScaleSpecReplicas: 3, // We want to downscale to 3 replicas only.
customResourceScaleStatusReplicas: 5,
Expand All @@ -676,6 +676,30 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)
},
},

"limited scale down by 2 replicas is allowed if delay time was reached on some pods at the end of statefulset": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-b", withReplicas(5, 5),
withMirrorReplicasAnnotations("test", customResourceGVK),
withDelayedDownscaleAnnotations(time.Hour, "http://pod/prepare-delayed-downscale")),
},
httpResponses: map[string]httpResponse{
"POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())},
"POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-75*time.Minute).Unix())},
"POST http://ingester-zone-b-2.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-30*time.Minute).Unix())}, // cannot be scaled down yet, as 1h has not elapsed
"POST http://ingester-zone-b-1.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-75*time.Minute).Unix())},
},
customResourceScaleSpecReplicas: 1, // We want to downscale to single replica
customResourceScaleStatusReplicas: 5,
expectedPatchedSets: map[string][]string{"ingester-zone-b": {`{"spec":{"replicas":3}}`}}, // Scaledown by 2 replicas (from 5 to 3) is allowed.
expectedHttpRequests: []string{
"DELETE http://ingester-zone-b-0.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale",
"POST http://ingester-zone-b-1.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale",
"POST http://ingester-zone-b-2.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale",
"POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale",
"POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale",
},
},

"scale down is not allowed, if POST returns non-200 HTTP status code, even if returned timestamps are outside of delay": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-b", withReplicas(5, 5),
Expand Down
30 changes: 19 additions & 11 deletions pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,45 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx

referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName)

desiredReplicas := scaleObj.Spec.Replicas
if currentReplicas == desiredReplicas {
updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, desiredReplicas)
cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, desiredReplicas)
referenceResourceDesiredReplicas := scaleObj.Spec.Replicas
if currentReplicas == referenceResourceDesiredReplicas {
updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas)
cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas)
// No change in the number of replicas: don't log because this will be the result most of the time.
continue
}

// We're going to change number of replicas on the statefulset.
// If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds,
// continue with downscaling or upscaling.
if err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, desiredReplicas); err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas, "err", err)
desiredReplicas, err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
if err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
"group", groupName,
"name", sts.GetName(),
"currentReplicas", currentReplicas,
"referenceResourceDesiredReplicas", referenceResourceDesiredReplicas,
"err", err,
)

updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, currentReplicas)
// If delay has not been reached, we can check next statefulset.
continue
}

direction := ""
logMsg := ""
if desiredReplicas > currentReplicas {
direction = "up"
logMsg = "scaling up statefulset to match replicas in the reference resource"
} else if desiredReplicas < currentReplicas {
direction = "down"
logMsg = "scaling down statefulset to computed desired replicas, based on replicas in the reference resource and elapsed downscale delays"
}

level.Info(c.logger).Log("msg", fmt.Sprintf("scaling %s statefulset to match reference resource", direction),
level.Info(c.logger).Log("msg", logMsg,
"group", groupName,
"name", sts.GetName(),
"currentReplicas", currentReplicas,
"desiredReplicas", desiredReplicas,
"referenceResourceDesiredReplicas", referenceResourceDesiredReplicas,
"computedDesiredReplicas", desiredReplicas,
"referenceResource", referenceResource,
)

Expand Down
72 changes: 46 additions & 26 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,27 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger,
callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) error {
// Checks if downscale delay has been reached on replicas in [desiredReplicas, currentReplicas) range.
// If there is a range of replicas at the end of statefulset for which delay has been reached, this function
// returns updated desired replicas that statefulset can be scaled to.
func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) (updatedDesiredReplicas int32, _ error) {
if currentReplicas == desiredReplicas {
// should not happen
return nil
return currentReplicas, nil
}

delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil || err != nil {
return err
if err != nil {
return currentReplicas, err
}
if delay == 0 || prepareURL == nil {
return desiredReplicas, err
}

if desiredReplicas >= currentReplicas {
callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL))
// Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile.
return nil
return desiredReplicas, nil
}

{
Expand All @@ -61,19 +67,34 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS

// Replicas in [desired, current) interval are going to be stopped.
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL)
maxPrepareTime, err := callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx, logger, httpClient, downscaleEndpoints)
elapsedTimeSinceDownscaleInitiated, err := callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx, logger, httpClient, downscaleEndpoints)
if err != nil {
return fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}

// Find how many pods from the end of statefulset we can already scale down
allowedDesiredReplicas := currentReplicas
for replica := currentReplicas - 1; replica >= desiredReplicas; replica-- {
elapsed, ok := elapsedTimeSinceDownscaleInitiated[int(replica)]
if !ok {
break
}

if elapsed < delay {
break
}

// We can scale down this replica
allowedDesiredReplicas--
}

elapsedSinceMaxTime := time.Since(maxPrepareTime)
if elapsedSinceMaxTime < delay {
return fmt.Errorf("configured downscale delay %v has not been reached for all pods. elapsed time: %v", delay, elapsedSinceMaxTime)
if allowedDesiredReplicas == currentReplicas {
return currentReplicas, fmt.Errorf("configured downscale delay %v has not been reached for any pods at the end of statefulset replicas range", delay)
}

// We can proceed with downscale!
level.Info(logger).Log("msg", "downscale delay has been reached on all downscaled pods", "name", sts.GetName(), "delay", delay, "elapsed", elapsedSinceMaxTime)
return nil
// We can proceed with downscale on at least one pod.
level.Info(logger).Log("msg", "downscale delay has been reached on some downscaled pods", "name", sts.GetName(), "delay", delay, "originalDesiredReplicas", desiredReplicas, "allowedDesiredReplicas", allowedDesiredReplicas)
return allowedDesiredReplicas, nil
}

func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) {
Expand Down Expand Up @@ -106,7 +127,7 @@ type endpoint struct {
namespace string
podName string
url url.URL
index int
replica int
}

// Create prepare-downscale endpoints for pods with index in [from, to) range. URL is fully reused except for host, which is replaced with pod's FQDN.
Expand All @@ -122,7 +143,7 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int
ep := endpoint{
namespace: namespace,
podName: fmt.Sprintf("%v-%v", serviceName, index),
index: index,
replica: index,
}

ep.url = *url
Expand All @@ -134,14 +155,14 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int
return eps
}

func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (time.Time, error) {
func callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]time.Duration, error) {
if len(endpoints) == 0 {
return time.Time{}, fmt.Errorf("no endpoints")
return nil, fmt.Errorf("no endpoints")
}

var (
maxTimeMu sync.Mutex
maxTime time.Time
timestampsMu sync.Mutex
timestamps = map[int]time.Duration{}
)

type expectedResponse struct {
Expand Down Expand Up @@ -195,19 +216,18 @@ func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logge
}

t := time.Unix(r.Timestamp, 0)
elapsed := time.Since(t)

maxTimeMu.Lock()
if t.After(maxTime) {
maxTime = t
}
maxTimeMu.Unlock()
timestampsMu.Lock()
timestamps[ep.replica] = elapsed
timestampsMu.Unlock()

level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339))
level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339), "elapsed", elapsed)
return nil
})
}
err := g.Wait()
return maxTime, err
return timestamps, err
}

func callCancelDelayedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) {
Expand Down

0 comments on commit 5dce3cc

Please sign in to comment.