Skip to content

Commit

Permalink
Merge pull request #52 from zalando-incubator/fix-bogus-logging
Browse files Browse the repository at this point in the history
Avoid irritating scaling message
  • Loading branch information
otrosien authored Apr 29, 2019
2 parents 0b6a458 + 6ef037e commit 849c277
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 53 deletions.
48 changes: 30 additions & 18 deletions operator/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package operator

import (
"fmt"

"math"

"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -73,7 +75,7 @@ func NewAutoScaler(es *ESResource, metricsInterval time.Duration, esClient *ESCl
}
}

func (as *AutoScaler) getScalingDirection() ScalingDirection {
func (as *AutoScaler) scalingHint() ScalingDirection {
scaling := as.eds.Spec.Scaling

// no metrics yet
Expand Down Expand Up @@ -106,11 +108,11 @@ func (as *AutoScaler) getScalingDirection() ScalingDirection {
}
}

requiredScaleupSamples := int(math.Ceil(float64(scaling.ScaleUpThresholdDurationSeconds) / as.metricsInterval.Seconds()))
if sampleSize >= requiredScaleupSamples {
requiredScaleUpSamples := int(math.Ceil(float64(scaling.ScaleUpThresholdDurationSeconds) / as.metricsInterval.Seconds()))
if sampleSize >= requiredScaleUpSamples {
// check if CPU is above threshold for the last n samples
scaleUpRequired := true
for _, currentItem := range as.esMSet.Metrics[sampleSize-requiredScaleupSamples:] {
for _, currentItem := range as.esMSet.Metrics[sampleSize-requiredScaleUpSamples:] {
if currentItem.Value <= scaling.ScaleUpCPUBoundary {
scaleUpRequired = false
break
Expand All @@ -130,7 +132,7 @@ func (as *AutoScaler) getScalingDirection() ScalingDirection {
// TODO: check alternative approach by configuring the tags used for `index.routing.allocation`
// and deriving the indices from there.
func (as *AutoScaler) GetScalingOperation() (*ScalingOperation, error) {
direction := as.getScalingDirection()
direction := as.scalingHint()
esIndices, err := as.esClient.GetIndices()
if err != nil {
return nil, err
Expand Down Expand Up @@ -188,19 +190,19 @@ func (as *AutoScaler) getManagedIndices(esIndices []ESIndex, esShards []ESShard)
return managedIndices
}

func (as *AutoScaler) calculateScalingOperation(managedIndices map[string]ESIndex, managedNodes []ESNode, direction ScalingDirection) *ScalingOperation {
func (as *AutoScaler) calculateScalingOperation(managedIndices map[string]ESIndex, managedNodes []ESNode, scalingHint ScalingDirection) *ScalingOperation {
scalingSpec := as.eds.Spec.Scaling

currentDesiredReplicas := as.eds.Spec.Replicas
if currentDesiredReplicas == nil {
currentDesiredNodeReplicas := as.eds.Spec.Replicas
if currentDesiredNodeReplicas == nil {
return noopScalingOperation("DesiredReplicas is not set yet.")
}

if len(managedIndices) == 0 {
return noopScalingOperation("No indices allocated yet.")
}

scalingOperation := as.scaleUpOrDown(managedIndices, direction, *currentDesiredReplicas)
scalingOperation := as.scaleUpOrDown(managedIndices, scalingHint, *currentDesiredNodeReplicas)

// safety check: ensure we don't scale below minIndexReplicas+1
if scalingOperation.NodeReplicas != nil && *scalingOperation.NodeReplicas < scalingSpec.MinIndexReplicas+1 {
Expand Down Expand Up @@ -241,7 +243,7 @@ func (as *AutoScaler) ensureUpperBoundNodeReplicas(scalingSpec *zv1.Elasticsearc
return newDesiredNodeReplicas
}

func (as *AutoScaler) scaleUpOrDown(esIndices map[string]ESIndex, direction ScalingDirection, currentDesiredReplicas int32) *ScalingOperation {
func (as *AutoScaler) scaleUpOrDown(esIndices map[string]ESIndex, scalingHint ScalingDirection, currentDesiredNodeReplicas int32) *ScalingOperation {
scalingSpec := as.eds.Spec.Scaling

currentTotalShards := int32(0)
Expand All @@ -250,21 +252,21 @@ func (as *AutoScaler) scaleUpOrDown(esIndices map[string]ESIndex, direction Scal
currentTotalShards += index.Primaries * (index.Replicas + 1)
}

currentShardToNodeRatio := float64(currentTotalShards) / float64(currentDesiredReplicas)
currentShardToNodeRatio := float64(currentTotalShards) / float64(currentDesiredNodeReplicas)

// independent of the scaling direction: in case the scaling settings have changed (e.g. the MaxShardsPerNode), we might need to scale up.
if currentShardToNodeRatio > float64(scalingSpec.MaxShardsPerNode) {
newDesiredNodeReplicas := as.ensureUpperBoundNodeReplicas(scalingSpec, int32(math.Ceil(float64(currentTotalShards)/float64(scalingSpec.MaxShardsPerNode))))
return &ScalingOperation{
ScalingDirection: UP,
ScalingDirection: as.calculateScalingDirection(currentDesiredNodeReplicas, newDesiredNodeReplicas),
NodeReplicas: &newDesiredNodeReplicas,
Description: fmt.Sprintf("Current shard-to-node ratio (%.2f) exceeding the desired limit of (%d).", currentShardToNodeRatio, scalingSpec.MaxShardsPerNode),
}
}

newDesiredIndexReplicas := make([]ESIndex, 0, len(esIndices))

switch direction {
switch scalingHint {
case UP:
if currentShardToNodeRatio <= float64(scalingSpec.MinShardsPerNode) {
newTotalShards := currentTotalShards
Expand All @@ -286,15 +288,15 @@ func (as *AutoScaler) scaleUpOrDown(esIndices map[string]ESIndex, direction Scal
Description: fmt.Sprintf("Keeping shard-to-node ratio (%.2f), and increasing index replicas.", currentShardToNodeRatio),
NodeReplicas: &newDesiredNodeReplicas,
IndexReplicas: newDesiredIndexReplicas,
ScalingDirection: direction,
ScalingDirection: as.calculateScalingDirection(currentDesiredNodeReplicas, newDesiredNodeReplicas),
}
}
}

newDesiredNodeReplicas := as.ensureUpperBoundNodeReplicas(scalingSpec, int32(math.Ceil(float64(currentTotalShards)/float64(currentShardToNodeRatio-1))))

return &ScalingOperation{
ScalingDirection: direction,
ScalingDirection: as.calculateScalingDirection(currentDesiredNodeReplicas, newDesiredNodeReplicas),
NodeReplicas: &newDesiredNodeReplicas,
Description: fmt.Sprintf("Increasing node replicas to %d.", newDesiredNodeReplicas),
}
Expand All @@ -313,25 +315,35 @@ func (as *AutoScaler) scaleUpOrDown(esIndices map[string]ESIndex, direction Scal
if newTotalShards != currentTotalShards {
newDesiredNodeReplicas := as.ensureLowerBoundNodeReplicas(scalingSpec, int32(math.Ceil(float64(newTotalShards)/float64(currentShardToNodeRatio))))
return &ScalingOperation{
ScalingDirection: direction,
ScalingDirection: as.calculateScalingDirection(currentDesiredNodeReplicas, newDesiredNodeReplicas),
NodeReplicas: &newDesiredNodeReplicas,
IndexReplicas: newDesiredIndexReplicas,
Description: fmt.Sprintf("Keeping shard-to-node ratio (%.2f), and decreasing index replicas.", currentShardToNodeRatio),
}
}
// increase shard-to-node ratio, and scale down by at least one
newDesiredNodeReplicas := as.ensureLowerBoundNodeReplicas(scalingSpec,
int32(math.Min(float64(currentDesiredReplicas)-float64(1), math.Ceil(float64(currentTotalShards)/float64(currentShardToNodeRatio+1)))))
int32(math.Min(float64(currentDesiredNodeReplicas)-float64(1), math.Ceil(float64(currentTotalShards)/float64(currentShardToNodeRatio+1)))))
ratio := float64(newTotalShards) / float64(newDesiredNodeReplicas)
if ratio >= float64(scalingSpec.MaxShardsPerNode) {
return noopScalingOperation(fmt.Sprintf("Scaling would violate the shard-to-node maximum (%.2f/%d).", ratio, scalingSpec.MaxShardsPerNode))
}

return &ScalingOperation{
ScalingDirection: direction,
ScalingDirection: as.calculateScalingDirection(currentDesiredNodeReplicas, newDesiredNodeReplicas),
NodeReplicas: &newDesiredNodeReplicas,
Description: fmt.Sprintf("Decreasing node replicas to %d.", newDesiredNodeReplicas),
}
}
return noopScalingOperation("Nothing to do")
}

func (as *AutoScaler) calculateScalingDirection(oldNodeReplicas, newNodeReplicas int32) ScalingDirection {
if newNodeReplicas > oldNodeReplicas {
return UP
}
if newNodeReplicas < oldNodeReplicas {
return DOWN
}
return NONE
}
Loading

0 comments on commit 849c277

Please sign in to comment.