Skip to content

Commit

Permalink
Add thread safety to support multiple concurrent reconciles (#662)
Browse files Browse the repository at this point in the history
* update common/config.go

* Init Config once

* Set MaxConcurrentReconciles to 1

* Init common config in util_test.go

* Support using sync.Map in common Config

* Revert ParseFloat error checking

* Add helper to load from sync.Map common Config

* Set to 8 max concurrent reconciles for debug

* Support debug logging levels with sync.Map

* Simplify config call for CM certs

* Pull reconciler.go changes
  • Loading branch information
kabicin authored Nov 22, 2024
1 parent da35c4f commit a2f803d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 48 deletions.
78 changes: 47 additions & 31 deletions common/config.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package common

import (
uberzap "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"errors"
"strconv"
"sync"

uberzap "go.uber.org/zap"
"go.uber.org/zap/zapcore"

corev1 "k8s.io/api/core/v1"
)

// OpConfig stored operator configuration
type OpConfig map[string]string

const (

// OpConfigDefaultHostname a DNS name to be used for hostname generation.
Expand Down Expand Up @@ -54,14 +53,18 @@ const (
)

// Config stores operator configuration
var Config = OpConfig{}
var Config *sync.Map

func init() {
Config = &sync.Map{}
}

var LevelFunc = uberzap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= Config.GetZapLogLevel()
return lvl >= GetZapLogLevel(Config)
})

var StackLevelFunc = uberzap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
configuredLevel := Config.GetZapLogLevel()
configuredLevel := GetZapLogLevel(Config)
if configuredLevel > zapcore.DebugLevel {
// No stack traces unless fine/finer/finest has been requested
// Zap's debug is mapped to fine
Expand All @@ -76,45 +79,59 @@ var StackLevelFunc = uberzap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
})

// LoadFromConfigMap creates a config out of kubernetes config map
func (oc OpConfig) LoadFromConfigMap(cm *corev1.ConfigMap) {
for k, v := range DefaultOpConfig() {
oc[k] = v
func LoadFromConfigMap(oc *sync.Map, cm *corev1.ConfigMap) {
cfg := DefaultOpConfig()
cfg.Range(func(key, value interface{}) bool {
oc.Store(key, value)
return true
})
for k, v := range cm.Data {
oc.Store(k, v)
}
}

for k, v := range cm.Data {
oc[k] = v
// Loads a string value stored at key in the sync.Map oc or "" if it does not exist
func LoadFromConfig(oc *sync.Map, key string) string {
value, ok := oc.Load(key)
if !ok {
return ""
}
return value.(string)
}
func (oc OpConfig) CheckValidValue(key string, OperatorName string) error {
value := oc[key]

func CheckValidValue(oc *sync.Map, key string, OperatorName string) error {
value := LoadFromConfig(oc, key)

intValue, err := strconv.Atoi(value)
if err != nil {
oc.SetConfigMapDefaultValue(key)
SetConfigMapDefaultValue(oc, key)
return errors.New(key + " in ConfigMap: " + OperatorName + " has an invalid syntax, error: " + err.Error())
} else if key == OpConfigReconcileIntervalSeconds && intValue <= 0 {
oc.SetConfigMapDefaultValue(key)
SetConfigMapDefaultValue(oc, key)
return errors.New(key + " in ConfigMap: " + OperatorName + " is set to " + value + ". It must be greater than 0.")
} else if key == OpConfigReconcileIntervalPercentage && intValue < 0 {
oc.SetConfigMapDefaultValue(key)
SetConfigMapDefaultValue(oc, key)
return errors.New(key + " in ConfigMap: " + OperatorName + " is set to " + value + ". It must be greater than or equal to 0.")
}

return nil
}

// SetConfigMapDefaultValue sets default value for specified key
func (oc OpConfig) SetConfigMapDefaultValue(key string) {
func SetConfigMapDefaultValue(oc *sync.Map, key string) {
cm := DefaultOpConfig()
oc[key] = cm[key]
defaultValue, ok := cm.Load(key)
if ok {
oc.Store(key, defaultValue)
}
}

// Returns the zap log level corresponding to the value of the
// 'logLevel' key in the config map. Returns 'info' if they key
// is missing or contains an invalid value.
func (oc OpConfig) GetZapLogLevel() zapcore.Level {
level, ok := oc[OpConfigLogLevel]
if !ok {
func GetZapLogLevel(oc *sync.Map) zapcore.Level {
level := LoadFromConfig(oc, OpConfigLogLevel)
if level == "" {
return zLevelInfo
}
switch level {
Expand All @@ -135,13 +152,12 @@ func (oc OpConfig) GetZapLogLevel() zapcore.Level {
}

// DefaultOpConfig returns default configuration
func DefaultOpConfig() OpConfig {
cfg := OpConfig{}
cfg[OpConfigDefaultHostname] = ""
cfg[OpConfigCMCADuration] = "8766h"
cfg[OpConfigCMCertDuration] = "2160h"
cfg[OpConfigLogLevel] = logLevelInfo
cfg[OpConfigReconcileIntervalSeconds] = "15"
cfg[OpConfigReconcileIntervalPercentage] = "100"
func DefaultOpConfig() *sync.Map {
cfg := &sync.Map{}
cfg.Store(OpConfigDefaultHostname, "")
cfg.Store(OpConfigCMCADuration, "8766h")
cfg.Store(OpConfigCMCertDuration, "2160h")
cfg.Store(OpConfigReconcileIntervalSeconds, "15")
cfg.Store(OpConfigReconcileIntervalPercentage, "100")
return cfg
}
12 changes: 8 additions & 4 deletions internal/controller/runtimecomponent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/go-logr/logr"

ctrl "sigs.k8s.io/controller-runtime"
kcontroller "sigs.k8s.io/controller-runtime/pkg/controller"

appstacksv1 "github.com/application-stacks/runtime-component-operator/api/v1"
imagev1 "github.com/openshift/api/image/v1"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (r *RuntimeComponentReconciler) Reconcile(ctx context.Context, req ctrl.Req
reqLogger.Info("Failed to find runtime-component-operator config map")
appstacksutils.CreateConfigMap(OperatorName)
} else {
common.Config.LoadFromConfigMap(configMap)
common.LoadFromConfigMap(common.Config, configMap)
}

// Fetch the RuntimeComponent instance
Expand All @@ -123,11 +124,11 @@ func (r *RuntimeComponentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return reconcile.Result{}, err
}

if err = common.Config.CheckValidValue(common.OpConfigReconcileIntervalSeconds, OperatorName); err != nil {
if err = common.CheckValidValue(common.Config, common.OpConfigReconcileIntervalSeconds, OperatorName); err != nil {
return r.ManageError(err, common.StatusConditionTypeReconciled, instance)
}

if err = common.Config.CheckValidValue(common.OpConfigReconcileIntervalPercentage, OperatorName); err != nil {
if err = common.CheckValidValue(common.Config, common.OpConfigReconcileIntervalPercentage, OperatorName); err != nil {
return r.ManageError(err, common.StatusConditionTypeReconciled, instance)
}

Expand Down Expand Up @@ -620,7 +621,10 @@ func (r *RuntimeComponentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.Secret{}, builder.WithPredicates(predSubResource)).
Owns(&appsv1.Deployment{}, builder.WithPredicates(predSubResWithGenCheck)).
Owns(&appsv1.StatefulSet{}, builder.WithPredicates(predSubResWithGenCheck)).
Owns(&autoscalingv1.HorizontalPodAutoscaler{}, builder.WithPredicates(predSubResource))
Owns(&autoscalingv1.HorizontalPodAutoscaler{}, builder.WithPredicates(predSubResource)).
WithOptions(kcontroller.Options{
MaxConcurrentReconciles: 8,
})

ok, _ := r.IsGroupVersionSupported(routev1.SchemeGroupVersion.String(), "Route")
if ok {
Expand Down
14 changes: 9 additions & 5 deletions utils/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func addStatusWarnings(ba common.BaseComponent) {
}

func getBaseReconcileInterval(s common.BaseComponentStatus) int32 {
baseIntervalInt, _ := strconv.Atoi(common.Config[common.OpConfigReconcileIntervalSeconds])
baseIntervalInt, _ := strconv.Atoi(common.LoadFromConfig(common.Config, common.OpConfigReconcileIntervalSeconds))
baseInterval := int32(baseIntervalInt)
s.SetReconcileInterval(&baseInterval)

Expand All @@ -213,8 +213,10 @@ func resetReconcileInterval(newCondition common.StatusCondition, s common.BaseCo
return time.Duration(baseInterval) * time.Second
}

// Precondition: Operator config values for common.OpConfigReconcileIntervalSeconds and common.OpConfigReconcileIntervalPercentage must be integers
func updateReconcileInterval(maxSeconds int, oldCondition common.StatusCondition, newCondition common.StatusCondition, s common.BaseComponentStatus) time.Duration {
var oldReconcileInterval int32

var newCount int32
count := oldCondition.GetUnchangedConditionCount()
if count == nil || s.GetReconcileInterval() == nil {
Expand All @@ -232,10 +234,11 @@ func updateReconcileInterval(maxSeconds int, oldCondition common.StatusCondition

// For every repeated 2 reconciliation errors, increase reconcile period
if newCount >= 2 && newCount%2 == 0 {
intervalIncreasePercentage, _ := strconv.ParseFloat(common.Config[common.OpConfigReconcileIntervalPercentage], 64)
intervalIncreasePercentage, _ := strconv.ParseFloat(common.LoadFromConfig(common.Config, common.OpConfigReconcileIntervalPercentage), 64)
exp := float64(newCount / 2)
increase := math.Pow(1+(intervalIncreasePercentage/100), exp)
baseInterval, _ := strconv.ParseFloat(common.Config[common.OpConfigReconcileIntervalSeconds], 64)

baseInterval, _ := strconv.ParseFloat(common.LoadFromConfig(common.Config, common.OpConfigReconcileIntervalSeconds), 64)
newInterval := int32(baseInterval * increase)

// Only increase to the maximum interval
Expand Down Expand Up @@ -515,10 +518,11 @@ func (r *ReconcilerBase) GenerateCMIssuer(namespace string, prefix string, CACom
Name: prefix + "-self-signed",
}

duration, err := time.ParseDuration(common.Config[common.OpConfigCMCADuration])
duration, err := time.ParseDuration(common.LoadFromConfig(common.Config, common.OpConfigCMCADuration))
if err != nil {
return err
}

caCert.Spec.Duration = &metav1.Duration{Duration: duration}
return nil
})
Expand Down Expand Up @@ -691,7 +695,7 @@ func (r *ReconcilerBase) GenerateSvcCertSecret(ba common.BaseComponent, prefix s

svcCert.Spec.SecretName = svcCertSecretName

duration, err := time.ParseDuration(common.Config[common.OpConfigCMCertDuration])
duration, err := time.ParseDuration(common.LoadFromConfig(common.Config, common.OpConfigCMCertDuration))
if err != nil {
return err
}
Expand Down
23 changes: 16 additions & 7 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ func CustomizeRoute(route *routev1.Route, ba common.BaseComponent, key string, c
route.Annotations = MergeMaps(route.Annotations, rt.GetAnnotations())

host := rt.GetHost()
if host == "" && common.Config[common.OpConfigDefaultHostname] != "" {
host = obj.GetName() + "-" + obj.GetNamespace() + "." + common.Config[common.OpConfigDefaultHostname]
defaultHostName := common.LoadFromConfig(common.Config, common.OpConfigDefaultHostname)
if host == "" && defaultHostName != "" {
host = obj.GetName() + "-" + obj.GetNamespace() + "." + defaultHostName
}

ba.GetStatus().SetReference(common.StatusReferenceRouteHost, host)
route.Spec.Host = host
route.Spec.Path = rt.GetPath()
Expand Down Expand Up @@ -1438,10 +1440,11 @@ func CustomizeIngress(ing *networkingv1.Ingress, ba common.BaseComponent) {
if ba.GetService().GetPortName() != "" {
servicePort = ba.GetService().GetPortName()
}

if host == "" && common.Config[common.OpConfigDefaultHostname] != "" {
host = obj.GetName() + "-" + obj.GetNamespace() + "." + common.Config[common.OpConfigDefaultHostname]
defaultHostName := common.LoadFromConfig(common.Config, common.OpConfigDefaultHostname)
if host == "" && defaultHostName != "" {
host = obj.GetName() + "-" + obj.GetNamespace() + "." + defaultHostName
}

if host == "" {
l := log.WithValues("Request.Namespace", obj.GetNamespace(), "Request.Name", obj.GetName())
l.Info("No Ingress hostname is provided. Ingress might not function correctly without hostname. It is recommended to set Ingress host or to provide default value through operator's config map.")
Expand Down Expand Up @@ -1770,7 +1773,12 @@ func CreateConfigMap(mapName string) {
// store it in a new map
common.Config = common.DefaultOpConfig()
_, cerr := controllerutil.CreateOrUpdate(context.TODO(), client, newConfigMap, func() error {
newConfigMap.Data = common.Config
newConfigMapData := make(map[string]string)
common.Config.Range(func(key, value interface{}) bool {
newConfigMapData[key.(string)] = value.(string)
return true
})
newConfigMap.Data = newConfigMapData
return nil
})
if cerr != nil {
Expand Down Expand Up @@ -1875,7 +1883,8 @@ func ShouldDeleteRoute(ba common.BaseComponent) bool {
// The host was previously set.
// If the host is now empty, delete the old route
rt := ba.GetRoute()
if rt == nil || (rt.GetHost() == "" && common.Config[common.OpConfigDefaultHostname] == "") {
defaultHostName := common.LoadFromConfig(common.Config, common.OpConfigDefaultHostname)
if rt == nil || (rt.GetHost() == "" && defaultHostName == "") {
return true
}
}
Expand Down
8 changes: 7 additions & 1 deletion utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ type Test struct {
actual interface{}
}

func TestMain(m *testing.M) {
common.Config = common.DefaultOpConfig()
rc := m.Run()
os.Exit(rc)
}

func TestCustomizeRoute(t *testing.T) {
logger := zap.New()
logf.SetLogger(logger)
Expand Down Expand Up @@ -783,7 +789,7 @@ func TestShouldDeleteRoute(t *testing.T) {

// When there is a defaultHost in config.
// This should be ignored as the route is nil
common.Config[common.OpConfigDefaultHostname] = "default.host"
common.Config.Store(common.OpConfigDefaultHostname, "default.host")
noPreviousWithDefault := ShouldDeleteRoute(runtime)

// If the route object exists with no host,
Expand Down

0 comments on commit a2f803d

Please sign in to comment.