From 341fc3b47e59e6e32fd83db92c0d1a7ed1381986 Mon Sep 17 00:00:00 2001 From: lamai93 Date: Mon, 5 Nov 2018 23:15:09 +0100 Subject: [PATCH 1/2] Fixing uninitialised `lastNumberOfServers`. --- pkg/deployment/cluster_scaling_integration.go | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index 1b2e1df5c..a3533d68e 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -65,6 +65,7 @@ func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration { // SendUpdateToCluster records the given spec to be sended to the cluster. func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) { + ci.log.Debug().Msg("SendUpdateToCluster called") ci.pendingUpdate.mutex.Lock() defer ci.pendingUpdate.mutex.Unlock() ci.pendingUpdate.spec = &spec @@ -75,6 +76,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct start := time.Now() goodInspections := 0 for { + ci.log.Debug().Msg("inspection loop for cluster int.") delay := time.Second * 2 // Is deployment in running state @@ -97,6 +99,8 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct goodInspections++ } } + } else { + ci.log.Debug().Msg("cluster Phase not Running") } select { @@ -112,6 +116,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct // Perform a single inspection of the cluster func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error { log := ci.log + log.Debug().Msg("inspect cluster for scaling integration") c, err := ci.depl.clientCache.GetDatabase(ctx) if err != nil { return maskAny(err) @@ -124,6 +129,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS return maskAny(err) } if req.Coordinators == nil && req.DBServers == nil { + log.Debug().Msg("Nothing to check") // Nothing to check return nil } @@ -132,15 +138,32 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS ci.lastNumberOfServers.mutex.Lock() defer ci.lastNumberOfServers.mutex.Unlock() desired := ci.lastNumberOfServers.NumberOfServers - if req.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { + if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { // #Coordinator has changed coordinatorsChanged = true } - if req.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { + if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { // #DBServers has changed dbserversChanged = true } if !coordinatorsChanged && !dbserversChanged { + // if there is nothing to change, check if we naver have asked the cluster before + // if so, fill in the values for the first time. + // This happens, when the operator is redeployed and there has not been any + // update events yet. + if desired.Coordinators == nil || desired.DBServers == nil { + //ci.lastNumberOfServers.mutex.Lock() + //defer ci.lastNumberOfServers.mutex.Unlock() + ci.log.Debug().Msg("Some of desired is nil") + if req.Coordinators != nil { + ci.lastNumberOfServers.NumberOfServers.Coordinators = req.Coordinators + } + if req.DBServers != nil { + ci.lastNumberOfServers.NumberOfServers.DBServers = req.DBServers + } + } + + ci.log.Debug().Msg("Nothing has changed") // Nothing has changed return nil } @@ -165,6 +188,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // Restore original spec in cluster ci.SendUpdateToCluster(current.Spec) } else { + log.Debug().Msg("UpdatedCRSpec via agency") if err := ci.depl.updateCRSpec(*newSpec); err != nil { log.Warn().Err(err).Msg("Failed to update current deployment") return maskAny(err) @@ -176,12 +200,14 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // updateClusterServerCount updates the intended number of servers of the cluster. // Returns true when it is safe to ask the cluster for updates. func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context, expectSuccess bool) (bool, error) { + ci.log.Debug().Msg("updateClusterServerCount") // Any update needed? ci.pendingUpdate.mutex.Lock() spec := ci.pendingUpdate.spec ci.pendingUpdate.mutex.Unlock() if spec == nil { // Nothing pending + ci.log.Debug().Msg("Nothing pending") return true, nil } @@ -198,13 +224,16 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex ci.lastNumberOfServers.mutex.Unlock() // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop) - if coordinatorCount != lastNumberOfServers.GetCoordinators() && dbserverCount != lastNumberOfServers.GetDBServers() { + if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() { + ci.log.Debug().Msg("Set number of servers now") if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { if expectSuccess { log.Debug().Err(err).Msg("Failed to set number of servers") } return false, maskAny(err) } + } else { + ci.log.Debug().Msg("Nothing has changed") } // Success, now update internal state From 9bcee7f08b3b7e11c1d5fe76f3d278d108c7dacf Mon Sep 17 00:00:00 2001 From: lamai93 Date: Tue, 6 Nov 2018 01:54:38 +0100 Subject: [PATCH 2/2] Clean up. --- pkg/deployment/cluster_scaling_integration.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index a3533d68e..6067a34b2 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -65,7 +65,6 @@ func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration { // SendUpdateToCluster records the given spec to be sended to the cluster. func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) { - ci.log.Debug().Msg("SendUpdateToCluster called") ci.pendingUpdate.mutex.Lock() defer ci.pendingUpdate.mutex.Unlock() ci.pendingUpdate.spec = &spec @@ -76,7 +75,6 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct start := time.Now() goodInspections := 0 for { - ci.log.Debug().Msg("inspection loop for cluster int.") delay := time.Second * 2 // Is deployment in running state @@ -99,8 +97,6 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct goodInspections++ } } - } else { - ci.log.Debug().Msg("cluster Phase not Running") } select { @@ -116,7 +112,6 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct // Perform a single inspection of the cluster func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error { log := ci.log - log.Debug().Msg("inspect cluster for scaling integration") c, err := ci.depl.clientCache.GetDatabase(ctx) if err != nil { return maskAny(err) @@ -129,7 +124,6 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS return maskAny(err) } if req.Coordinators == nil && req.DBServers == nil { - log.Debug().Msg("Nothing to check") // Nothing to check return nil } @@ -147,14 +141,11 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS dbserversChanged = true } if !coordinatorsChanged && !dbserversChanged { - // if there is nothing to change, check if we naver have asked the cluster before + // if there is nothing to change, check if we never have asked the cluster before // if so, fill in the values for the first time. // This happens, when the operator is redeployed and there has not been any // update events yet. if desired.Coordinators == nil || desired.DBServers == nil { - //ci.lastNumberOfServers.mutex.Lock() - //defer ci.lastNumberOfServers.mutex.Unlock() - ci.log.Debug().Msg("Some of desired is nil") if req.Coordinators != nil { ci.lastNumberOfServers.NumberOfServers.Coordinators = req.Coordinators } @@ -163,7 +154,6 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS } } - ci.log.Debug().Msg("Nothing has changed") // Nothing has changed return nil } @@ -171,7 +161,6 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS apiObject := ci.depl.apiObject current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(apiObject.Namespace).Get(apiObject.Name, metav1.GetOptions{}) if err != nil { - log.Debug().Err(err).Msg("Failed to get current deployment") return maskAny(err) } newSpec := current.Spec.DeepCopy() @@ -188,7 +177,6 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // Restore original spec in cluster ci.SendUpdateToCluster(current.Spec) } else { - log.Debug().Msg("UpdatedCRSpec via agency") if err := ci.depl.updateCRSpec(*newSpec); err != nil { log.Warn().Err(err).Msg("Failed to update current deployment") return maskAny(err) @@ -200,14 +188,12 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // updateClusterServerCount updates the intended number of servers of the cluster. // Returns true when it is safe to ask the cluster for updates. func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context, expectSuccess bool) (bool, error) { - ci.log.Debug().Msg("updateClusterServerCount") // Any update needed? ci.pendingUpdate.mutex.Lock() spec := ci.pendingUpdate.spec ci.pendingUpdate.mutex.Unlock() if spec == nil { // Nothing pending - ci.log.Debug().Msg("Nothing pending") return true, nil } @@ -225,15 +211,12 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop) if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() { - ci.log.Debug().Msg("Set number of servers now") if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { if expectSuccess { log.Debug().Err(err).Msg("Failed to set number of servers") } return false, maskAny(err) } - } else { - ci.log.Debug().Msg("Nothing has changed") } // Success, now update internal state