Skip to content

Commit

Permalink
ROX-20872: fix fleetshard reconciler race condition (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
ludydoo authored Feb 6, 2024
1 parent e46f7ca commit c48e249
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 30 deletions.
102 changes: 74 additions & 28 deletions fleetshard/pkg/central/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ const (
tenantImagePullSecretName = "stackrox" // pragma: allowlist secret
)

type verifyAuthProviderExistsFunc func(ctx context.Context, central private.ManagedCentral,
client ctrlClient.Client) (bool, error)
type verifyAuthProviderExistsFunc func(ctx context.Context, central private.ManagedCentral, client ctrlClient.Client) (bool, error)
type needsReconcileFunc func(changed bool, central *v1alpha1.Central, storedSecrets []string) bool
type restoreCentralSecretsFunc func(ctx context.Context, remoteCentral private.ManagedCentral) error
type areSecretsStoredFunc func(secretsStored []string) bool

// CentralReconcilerOptions are the static options for creating a reconciler.
type CentralReconcilerOptions struct {
Expand All @@ -111,6 +113,7 @@ type CentralReconciler struct {
central private.ManagedCentral
status *int32
lastCentralHash [16]byte
lastCentralHashTime time.Time
useRoutes bool
Resources bool
routeService *k8s.RouteService
Expand All @@ -133,6 +136,11 @@ type CentralReconciler struct {
hasAuthProvider bool
verifyAuthProviderFunc verifyAuthProviderExistsFunc
tenantImagePullSecret []byte
clock clock

areSecretsStoredFunc areSecretsStoredFunc
needsReconcileFunc needsReconcileFunc
restoreCentralSecretsFunc restoreCentralSecretsFunc
}

// Reconcile takes a private.ManagedCentral and tries to install it into the cluster managed by the fleet-shard.
Expand All @@ -146,16 +154,29 @@ func (r *CentralReconciler) Reconcile(ctx context.Context, remoteCentral private
}
defer atomic.StoreInt32(r.status, FreeStatus)

central, err := r.getInstanceConfig(&remoteCentral)
centralHash, err := r.computeCentralHash(remoteCentral)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "computing central hash")
}

changed, err := r.centralChanged(remoteCentral)
central, err := r.getInstanceConfig(&remoteCentral)
if err != nil {
return nil, errors.Wrap(err, "checking if central changed")
return nil, err
}
needsReconcile := r.needsReconcile(changed, central, remoteCentral.Metadata.SecretsStored)

shouldUpdateCentralHash := false
defer func() {
if shouldUpdateCentralHash {
r.lastCentralHash = centralHash
r.lastCentralHashTime = time.Now()
} else {
r.lastCentralHash = [16]byte{}
}
}()

changed := r.centralChanged(centralHash)

needsReconcile := r.needsReconcileFunc(changed, central, remoteCentral.Metadata.SecretsStored)

if !needsReconcile && r.shouldSkipReadyCentral(remoteCentral) {
return nil, ErrCentralNotChanged
Expand All @@ -166,7 +187,9 @@ func (r *CentralReconciler) Reconcile(ctx context.Context, remoteCentral private
remoteCentralNamespace := remoteCentral.Metadata.Namespace

if remoteCentral.Metadata.DeletionTimestamp != "" {
return r.reconcileInstanceDeletion(ctx, &remoteCentral, central)
status, err := r.reconcileInstanceDeletion(ctx, &remoteCentral, central)
shouldUpdateCentralHash = err == nil
return status, err
}

namespaceLabels := map[string]string{
Expand All @@ -187,7 +210,7 @@ func (r *CentralReconciler) Reconcile(ctx context.Context, remoteCentral private
}
}

err = r.restoreCentralSecrets(ctx, remoteCentral)
err = r.restoreCentralSecretsFunc(ctx, remoteCentral)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,11 +283,7 @@ func (r *CentralReconciler) Reconcile(ctx context.Context, remoteCentral private
return nil, err
}

// Setting the last central hash must always be executed as the last step.
// defer can't be used for this call because it is also executed after the reconcile failed.
if err := r.setLastCentralHash(remoteCentral); err != nil {
return nil, errors.Wrap(err, "setting central reconcilation cache")
}
shouldUpdateCentralHash = true

logStatus := *status
logStatus.Secrets = obscureSecrets(status.Secrets)
Expand Down Expand Up @@ -685,9 +704,10 @@ func (r *CentralReconciler) reconcileCentral(ctx context.Context, remoteCentral
return errors.Wrapf(err, "incrementing Central %v revision", centralKey)
}

if err := r.client.Update(ctx, desiredCentral); err != nil {
if err := r.client.Update(context.Background(), desiredCentral); err != nil {
return errors.Wrapf(err, "updating Central %v", centralKey)
}

}

return nil
Expand Down Expand Up @@ -1079,22 +1099,20 @@ func (r *CentralReconciler) getDatabaseID(ctx context.Context, remoteCentralName
}

// centralChanged compares the given central to the last central reconciled using a hash
func (r *CentralReconciler) centralChanged(central private.ManagedCentral) (bool, error) {
currentHash, err := util.MD5SumFromJSONStruct(&central)
if err != nil {
return true, errors.Wrap(err, "hashing central")
}
return !bytes.Equal(r.lastCentralHash[:], currentHash[:]), nil
func (r *CentralReconciler) centralChanged(currentHash [16]byte) bool {
return !bytes.Equal(r.lastCentralHash[:], currentHash[:])
}

func (r *CentralReconciler) setLastCentralHash(currentHash [16]byte) {
r.lastCentralHash = currentHash
}

func (r *CentralReconciler) setLastCentralHash(central private.ManagedCentral) error {
func (r *CentralReconciler) computeCentralHash(central private.ManagedCentral) ([16]byte, error) {
hash, err := util.MD5SumFromJSONStruct(&central)
if err != nil {
return fmt.Errorf("calculating MD5 from JSON: %w", err)
return [16]byte{}, fmt.Errorf("calculating MD5 from JSON: %w", err)
}

r.lastCentralHash = hash
return nil
return hash, nil
}

func (r *CentralReconciler) getNamespace(name string) (*corev1.Namespace, error) {
Expand Down Expand Up @@ -1639,14 +1657,18 @@ func (r *CentralReconciler) shouldSkipReadyCentral(remoteCentral private.Managed
}

func (r *CentralReconciler) needsReconcile(changed bool, central *v1alpha1.Central, storedSecrets []string) bool {
if !r.areSecretsStored(storedSecrets) {
if !r.areSecretsStoredFunc(storedSecrets) {
return true
}

if changed {
return true
}

if r.clock.Now().Sub(r.lastCentralHashTime) > time.Minute*15 {
return true
}

forceReconcile, ok := central.Labels["rhacs.redhat.com/force-reconcile"]
return ok && forceReconcile == "true"
}
Expand Down Expand Up @@ -1723,7 +1745,7 @@ func NewCentralReconciler(k8sClient ctrlClient.Client, fleetmanagerClient *fleet
secretCipher cipher.Cipher, encryptionKeyGenerator cipher.KeyGenerator,
opts CentralReconcilerOptions,
) *CentralReconciler {
return &CentralReconciler{
r := &CentralReconciler{
client: k8sClient,
fleetmanagerClient: fleetmanagerClient,
central: central,
Expand All @@ -1748,7 +1770,13 @@ func NewCentralReconciler(k8sClient ctrlClient.Client, fleetmanagerClient *fleet
tenantImagePullSecret: []byte(opts.TenantImagePullSecret),

resourcesChart: resourcesChart,
clock: realClock{},
}
r.needsReconcileFunc = r.needsReconcile

r.restoreCentralSecretsFunc = r.restoreCentralSecrets //pragma: allowlist secret
r.areSecretsStoredFunc = r.areSecretsStored //pragma: allowlist secret
return r
}

func obscureSecrets(secrets map[string]string) map[string]string {
Expand All @@ -1760,3 +1788,21 @@ func obscureSecrets(secrets map[string]string) map[string]string {

return obscuredSecrets
}

type clock interface {
Now() time.Time
}

type realClock struct{}

func (realClock) Now() time.Time {
return time.Now()
}

type fakeClock struct {
NowTime time.Time
}

func (f fakeClock) Now() time.Time {
return f.NowTime
}
Loading

0 comments on commit c48e249

Please sign in to comment.