Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial blueprint changes for preserving VRG as Secondary always #1652

Merged
merged 3 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 20 additions & 194 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (d *DRPCInstance) RunInitialDeployment() (bool, error) {
}

// If we get here, the deployment is successful
err = d.EnsureVolSyncReplicationSetup(homeCluster)
err = d.EnsureSecondaryReplicationSetup(homeCluster)
if err != nil {
return !done, err
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func (d *DRPCInstance) RunFailover() (bool, error) {
}

// isValidFailoverTarget determines if the passed in cluster is a valid target to failover to. A valid failover target
// may already be Primary, if it is Secondary then it has to be protecting PVCs with VolSync.
// may already be Primary
// NOTE: Currently there is a gap where, right after DR protection when a Secondary VRG is not yet created for VolSync
// workloads, a failover if initiated will pass these checks. When we fix to retain VRG for VR as well, a more
// deterministic check for VRG as Secondary can be performed.
Expand All @@ -397,13 +397,6 @@ func (d *DRPCInstance) isValidFailoverTarget(cluster string) bool {

vrg, err := d.reconciler.MCVGetter.GetVRGFromManagedCluster(d.instance.Name, d.vrgNamespace, cluster, annotations)
if err != nil {
if errors.IsNotFound(err) {
d.log.Info(fmt.Sprintf("VRG not found on %q", cluster))

// Valid target as there would be no VRG for VR and Sync cases
return true
}

return false
}

Expand All @@ -413,12 +406,14 @@ func (d *DRPCInstance) isValidFailoverTarget(cluster string) bool {
}

// Valid target only if VRG is protecting PVCs with VS and its status is also Secondary
if d.drType == DRTypeAsync && vrg.Status.State == rmn.SecondaryState &&
!vrg.Spec.VolSync.Disabled && len(vrg.Spec.VolSync.RDSpec) != 0 {
return true
if vrg.Status.State != rmn.SecondaryState || vrg.Status.ObservedGeneration != vrg.Generation {
d.log.Info(fmt.Sprintf("VRG on %s has not transitioned to secondary yet. Spec-State/Status-State %s/%s",
cluster, vrg.Spec.ReplicationState, vrg.Status.State))

return false
}

return false
return true
}

func (d *DRPCInstance) checkClusterFenced(cluster string, drClusters []rmn.DRCluster) (bool, error) {
Expand Down Expand Up @@ -933,7 +928,7 @@ func (d *DRPCInstance) ensureActionCompleted(srcCluster string) (bool, error) {
}

// Cleanup and setup VolSync if enabled
err = d.ensureCleanupAndVolSyncReplicationSetup(srcCluster)
err = d.ensureCleanupAndSecondaryReplicationSetup(srcCluster)
if err != nil {
return !done, err
}
Expand All @@ -945,7 +940,7 @@ func (d *DRPCInstance) ensureActionCompleted(srcCluster string) (bool, error) {
return done, nil
}

func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string) error {
func (d *DRPCInstance) ensureCleanupAndSecondaryReplicationSetup(srcCluster string) error {
// If we have VolSync replication, this is the perfect time to reset the RDSpec
// on the primary. This will cause the RD to be cleared on the primary
err := d.ResetVolSyncRDOnPrimary(srcCluster)
Expand All @@ -968,7 +963,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string

// After we ensured peers are clean, The VolSync ReplicationSource (RS) will automatically get
// created, but for the ReplicationDestination, we need to explicitly tell the VRG to create it.
err = d.EnsureVolSyncReplicationSetup(srcCluster)
err = d.EnsureSecondaryReplicationSetup(srcCluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -1449,79 +1444,6 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
return true
}

func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
d.log.Info("Cleaning up secondaries.")

for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if skipCluster == clusterName {
continue
}

// If VRG hasn't been deleted, then make sure that the MW for it is deleted and
// return and wait, but first make sure that the cluster is accessible
if err := checkAccessToVRGOnCluster(d.reconciler.MCVGetter, d.instance.GetName(), d.instance.GetNamespace(),
d.vrgNamespace, clusterName); err != nil {
return false, err
}

mwDeleted, err := d.ensureVRGManifestWorkOnClusterDeleted(clusterName)
if err != nil {
return false, err
}

if !mwDeleted {
return false, nil
}

d.log.Info("MW has been deleted. Check the VRG")

if !d.ensureVRGDeleted(clusterName) {
d.log.Info("VRG has not been deleted yet", "cluster", clusterName)

return false, nil
}

err = d.reconciler.MCVGetter.DeleteVRGManagedClusterView(d.instance.Name, d.vrgNamespace, clusterName,
rmnutil.MWTypeVRG)
// MW is deleted, VRG is deleted, so we no longer need MCV for the VRG
if err != nil {
d.log.Info("Deletion of VRG MCV failed")

return false, fmt.Errorf("deletion of VRG MCV failed %w", err)
}

err = d.reconciler.MCVGetter.DeleteNamespaceManagedClusterView(d.instance.Name, d.vrgNamespace, clusterName,
rmnutil.MWTypeNS)
// MCV for Namespace is no longer needed
if err != nil {
d.log.Info("Deletion of Namespace MCV failed")

return false, fmt.Errorf("deletion of namespace MCV failed %w", err)
}
}

return true, nil
}

func checkAccessToVRGOnCluster(mcvGetter rmnutil.ManagedClusterViewGetter,
name, drpcNamespace, vrgNamespace, clusterName string,
) error {
annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = name
annotations[DRPCNamespaceAnnotation] = drpcNamespace

_, err := mcvGetter.GetVRGFromManagedCluster(name,
vrgNamespace, clusterName, annotations)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
}

return nil
}

func (d *DRPCInstance) updateUserPlacementRule(homeCluster, reason string) error {
d.log.Info(fmt.Sprintf("Updating user Placement %s homeCluster %s",
d.userPlacement.GetName(), homeCluster))
Expand Down Expand Up @@ -1954,6 +1876,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

condition := findCondition(d.instance.Status.Conditions, rmn.ConditionPeerReady)

// Because we init conditions we will always find the condition and not move it to ReasonProgressing?
if condition == nil {
msg := "Starting cleanup check"
d.log.Info(msg)
Expand All @@ -1973,43 +1896,12 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

d.log.Info(fmt.Sprintf("PeerReady Condition is %s, msg: %s", condition.Status, condition.Message))

// IFF we have VolSync PVCs, then no need to clean up
homeCluster := clusterToSkip

repReq, err := d.IsVolSyncReplicationRequired(homeCluster)
if err != nil {
return fmt.Errorf("failed to check if VolSync replication is required (%w)", err)
}

if repReq {
return d.cleanupForVolSync(clusterToSkip)
}

clean, err := d.cleanupSecondaries(clusterToSkip)
if err != nil {
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonCleaning, err.Error())

return err
}

if !clean {
msg := "cleaning up secondaries"
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonCleaning, msg)

return fmt.Errorf("waiting to clean secondaries")
}

addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionTrue, rmn.ReasonSuccess, "Cleaned")

return nil
return d.cleanupSecondaries(clusterToSkip)
}

//nolint:gocognit
func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
d.log.Info("VolSync needs both VRGs. Ensure secondary setup on peer")
func (d *DRPCInstance) cleanupSecondaries(clusterToSkip string) error {
d.log.Info("Ensure secondary setup on peer")

peersReady := true

Expand All @@ -2018,15 +1910,19 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
continue
}

// Update PeerReady condition to appropriate reasons in here!
justUpdated, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil {
d.log.Info(fmt.Sprintf("Failed to update VRG state for cluster %s. Err (%v)", clusterName, err))

peersReady = false

// Recreate the VRG ManifestWork for the secondary. This typically happens during Hub Recovery.
// Ideally this will never be called due to adoption of VRG in place, in the case of upgrades from older
// scheme were VRG was not preserved for VR workloads, this can be hit IFF the upgrade happened when some
// workload was not in peerReady state.
if errors.IsNotFound(err) {
err := d.ensureVolSyncSetup(clusterToSkip)
err := d.EnsureSecondaryReplicationSetup(clusterToSkip)
if err != nil {
return err
}
Expand Down Expand Up @@ -2055,48 +1951,6 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
return nil
}

func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string) (bool, error) {
d.log.Info("Ensuring MW for the VRG is deleted", "cluster", clusterName)

const done = true

mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
if err != nil {
if errors.IsNotFound(err) {
return done, nil
}

return !done, fmt.Errorf("failed to retrieve ManifestWork (%w)", err)
}

if rmnutil.ResourceIsDeleted(mw) {
d.log.Info("Waiting for VRG MW to be fully deleted", "cluster", clusterName)
// As long as the Manifestwork still exist, then we are not done
return !done, nil
}

// If .spec.ReplicateSpec has not already been updated to secondary, then update it.
// If we do update it to secondary, then we have to wait for the MW to be applied
updated, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil || updated {
return !done, err
}

if d.ensureVRGIsSecondaryOnCluster(clusterName) {
// delete VRG manifest work
err = d.mwu.DeleteManifestWork(d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG), clusterName)
if err != nil {
return !done, fmt.Errorf("%w", err)
}
}

d.log.Info("Request not complete yet", "cluster", clusterName)

// IF we get here, either the VRG has not transitioned to secondary (yet) or delete didn't succeed. In either cases,
// we need to make sure that the VRG object is deleted. IOW, we still have to wait
return !done, nil
}

// ensureVRGIsSecondaryEverywhere iterates through all the clusters in the DRCluster set,
// and for each cluster, it checks whether the VRG (if exists) is secondary. It will skip
// a cluster if provided. It returns true if all clusters report secondary for the VRG,
Expand Down Expand Up @@ -2227,34 +2081,6 @@ func (d *DRPCInstance) ensureDataProtectedOnCluster(clusterName string) bool {
return true
}

func (d *DRPCInstance) ensureVRGDeleted(clusterName string) bool {
d.mcvRequestInProgress = false

annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

vrg, err := d.reconciler.MCVGetter.GetVRGFromManagedCluster(d.instance.Name,
d.vrgNamespace, clusterName, annotations)
if err != nil {
// Only NotFound error is accepted
if errors.IsNotFound(err) {
return true // ensured
}

d.log.Info("Failed to get VRG", "error", err)

d.mcvRequestInProgress = true
// Retry again
return false
}

d.log.Info(fmt.Sprintf("VRG not deleted(%s)", vrg.Name))

return false
}

func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationState) (bool, error) {
d.log.Info(fmt.Sprintf("Updating VRG ReplicationState to %s for cluster %s", state, clusterName))

Expand Down
Loading