diff --git a/CHANGELOG.md b/CHANGELOG.md index b9f7917a703..4b842eb4544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ Here is an overview of all new **experimental** features: ### Fixes +- **General**: Scalers cache uses a mutex to prevent concurrent actions ([#6273](https://github.com/kedacore/keda/issues/6273)) - **AWS Secret Manager**: Pod identity overrides are honored ([#6195](https://github.com/kedacore/keda/issues/6195)) - **AWS SQS Scaler**: Improve error handling for SQS queue metrics ([#6178](https://github.com/kedacore/keda/issues/6178)) - **Azure Event Hub Scaler**: Checkpointer errors are correctly handled ([#6084](https://github.com/kedacore/keda/issues/6084)) diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index acd253804c4..332a419b504 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "sync" "time" "github.com/expr-lang/expr/vm" @@ -40,6 +41,7 @@ type ScalersCache struct { ScalableObjectGeneration int64 Recorder record.EventRecorder CompiledFormula *vm.Program + mutex sync.RWMutex } type ScalerBuilder struct { @@ -50,6 +52,9 @@ type ScalerBuilder struct { // GetScalers returns array of scalers and scaler config stored in the cache func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalersconfig.ScalerConfig) { + c.mutex.RLock() + defer c.mutex.RUnlock() + scalersList := make([]scalers.Scaler, 0, len(c.Scalers)) configsList := make([]scalersconfig.ScalerConfig, 0, len(c.Scalers)) for _, s := range c.Scalers { @@ -60,6 +65,17 @@ func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalersconfig.ScalerCon return scalersList, configsList } +// getScalerBuilder returns a ScalerBuilder stored in the cache +func (c *ScalersCache) getScalerBuilder(index int) (ScalerBuilder, error) { + if index < 0 || index >= len(c.Scalers) { + return ScalerBuilder{}, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + } + + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.Scalers[index], nil +} + // GetPushScalers returns array of push scalers stored in the cache func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { var result []scalers.PushScaler @@ -73,8 +89,10 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { // Close closes all scalers in the cache func (c *ScalersCache) Close(ctx context.Context) { + c.mutex.Lock() scalers := c.Scalers c.Scalers = nil + c.mutex.Unlock() for _, s := range scalers { err := s.Scaler.Close(ctx) if err != nil { @@ -85,6 +103,8 @@ func (c *ScalersCache) Close(ctx context.Context) { // GetMetricSpecForScaling returns metrics specs for all scalers in the cache func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec { + c.mutex.RLock() + defer c.mutex.RUnlock() var spec []v2.MetricSpec for _, s := range c.Scalers { spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...) @@ -96,12 +116,12 @@ func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricS func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) { var err error - scalersList, _ := c.GetScalers() - if index < 0 || index >= len(scalersList) { - return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + sb, err := c.getScalerBuilder(index) + if err != nil { + return nil, err } - metricSpecs := scalersList[index].GetMetricSpecForScaling(ctx) + metricSpecs := sb.Scaler.GetMetricSpecForScaling(ctx) // no metric spec returned for a scaler -> this could signal error during connection to the scaler // usually in case this is an external scaler @@ -123,11 +143,12 @@ func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, ind // GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name // and by the input index (from the list of scalers in this ScaledObject) func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, time.Duration, error) { - if index < 0 || index >= len(c.Scalers) { - return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + sb, err := c.getScalerBuilder(index) + if err != nil { + return nil, false, -1, err } startTime := time.Now() - metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName) + metric, activity, err := sb.Scaler.GetMetricsAndActivity(ctx, metricName) if err == nil { return metric, activity, time.Since(startTime), nil } @@ -141,26 +162,31 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index return metric, activity, time.Since(startTime), err } -func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { - if id < 0 || id >= len(c.Scalers) { - return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) +func (c *ScalersCache) refreshScaler(ctx context.Context, index int) (scalers.Scaler, error) { + oldSb, err := c.getScalerBuilder(index) + if err != nil { + return nil, err } - sb := c.Scalers[id] - defer sb.Scaler.Close(ctx) - ns, sConfig, err := sb.Factory() + c.mutex.Lock() + defer c.mutex.Unlock() + + newScaler, sConfig, err := oldSb.Factory() if err != nil { return nil, err } - if id < 0 || id >= len(c.Scalers) { - return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) + if index < 0 || index >= len(c.Scalers) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) } - c.Scalers[id] = ScalerBuilder{ - Scaler: ns, + + c.Scalers[index] = ScalerBuilder{ + Scaler: newScaler, ScalerConfig: *sConfig, - Factory: sb.Factory, + Factory: oldSb.Factory, } - return ns, nil + oldSb.Scaler.Close(ctx) + + return newScaler, nil }