diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index c786ea7cb..f8240752c 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -22,7 +22,6 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs" podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod" - _ "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc" nodeserver "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/node-server" diff --git a/deploy/crds/kosmos.io_promotepolicies.yaml b/deploy/crds/kosmos.io_promotepolicies.yaml index f2e0ecd38..3b1d76e40 100644 --- a/deploy/crds/kosmos.io_promotepolicies.yaml +++ b/deploy/crds/kosmos.io_promotepolicies.yaml @@ -70,10 +70,16 @@ spec: type: string nullable: true type: array + rollback: + description: Rollback set true, then rollback from the backup + nullable: true + type: string type: object status: description: PromotePolicyStatus defines the observed state of promotePolicy properties: + backedupFile: + type: string completionTimestamp: description: CompletionTimestamp records the time a sync was completed. Completion time is recorded even on failed sync. The server's time diff --git a/pkg/apis/kosmos/v1alpha1/promotepolicy_types.go b/pkg/apis/kosmos/v1alpha1/promotepolicy_types.go index 384ee2af8..8ddaee5a2 100644 --- a/pkg/apis/kosmos/v1alpha1/promotepolicy_types.go +++ b/pkg/apis/kosmos/v1alpha1/promotepolicy_types.go @@ -35,6 +35,11 @@ type PromotePolicySpec struct { // +optional // +nullable ExcludedNamespaceScopedResources []string `json:"excludedNamespaceScopedResources,omitempty"` + + // Rollback set true, then rollback from the backup + // +optional + // +nullable + Rollback string `json:"rollback,omitempty"` } // PromotePolicyPhase is a string representation of the lifecycle phase @@ -65,6 +70,12 @@ const ( // PromotePolicyPhaseFailedRestore means restore has failed PromotePolicyPhaseFailedRestore PromotePolicyPhase = "FailedRestore" + // PromotePolicyPhaseFailedRollback means rollback has failed + PromotePolicyPhaseFailedRollback PromotePolicyPhase = "FailedRollback" + + // PromotePolicyPhaseRolledback means rollback has successed + PromotePolicyPhaseRolledback PromotePolicyPhase = "RolledBack" + // PromotePolicyPhaseCompleted means the sync has run successfully PromotePolicyPhaseCompleted PromotePolicyPhase = "Completed" ) @@ -118,6 +129,8 @@ type PromotePolicyStatus struct { // +optional // +nullable Progress *PromotePolicyProgress `json:"progress,omitempty"` + + BackedupFile string `json:"backedupFile,omitempty"` } // +genclient diff --git a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go index 9f90f5595..2a635d364 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/google/go-cmp/cmp" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -149,12 +148,13 @@ func (r *LeafPodReconciler) SetupWithManager(mgr manager.Manager) error { return skipFunc(createEvent.Object) }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - pod1 := updateEvent.ObjectOld.(*corev1.Pod) - pod2 := updateEvent.ObjectNew.(*corev1.Pod) - if !skipFunc(updateEvent.ObjectNew) { - return false - } - return !cmp.Equal(pod1.Status, pod2.Status) + return skipFunc(updateEvent.ObjectNew) + //pod1 := updateEvent.ObjectOld.(*corev1.Pod) + //pod2 := updateEvent.ObjectNew.(*corev1.Pod) + //if !skipFunc(updateEvent.ObjectNew) { + // return false + //} + //return !cmp.Equal(pod1.Status, pod2.Status) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return skipFunc(deleteEvent.Object) diff --git a/pkg/clustertree/cluster-manager/controllers/promote/backup/item_backupper.go b/pkg/clustertree/cluster-manager/controllers/promote/backup/item_backupper.go index 7f1a7d3d6..5593250b5 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/backup/item_backupper.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/backup/item_backupper.go @@ -2,7 +2,6 @@ package backup import ( "archive/tar" - "fmt" "time" "github.com/pkg/errors" @@ -18,6 +17,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/client" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/discovery" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/archive" ) @@ -68,8 +68,8 @@ func (ib *itemBackupper) backupItemInternal(obj runtime.Unstructured, groupResou namespace := metadata.GetNamespace() name := metadata.GetName() - key := requests.ItemKey{ - Resource: resourceKey(obj), + key := types.ItemKey{ + Resource: groupResource.String(), Namespace: namespace, Name: name, } @@ -189,10 +189,3 @@ func resourceVersion(obj runtime.Unstructured) string { gvk := obj.GetObjectKind().GroupVersionKind() return gvk.Version } - -// resourceKey returns a string representing the object's GroupVersionKind (e.g. -// apps/v1/Deployment). -func resourceKey(obj runtime.Unstructured) string { - gvk := obj.GetObjectKind().GroupVersionKind() - return fmt.Sprintf("%s/%s", gvk.GroupVersion().String(), gvk.Kind) -} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/backup/register_action.go b/pkg/clustertree/cluster-manager/controllers/promote/backup/register_action.go index 9bb02a7a5..bdecee744 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/backup/register_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/backup/register_action.go @@ -5,6 +5,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" ) // BackupItemAction is an actor that performs an operation on an individual item being backed up. @@ -34,7 +35,7 @@ func registerBackupActions() (map[string]BackupItemAction, error) { return actionMap, nil } -func registerBackupItemAction(actionsMap map[string]BackupItemAction, initializer requests.HandlerInitializer) error { +func registerBackupItemAction(actionsMap map[string]BackupItemAction, initializer types.HandlerInitializer) error { instance, err := initializer() if err != nil { return errors.WithMessage(err, "init backup action instance error") diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/detach.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/detach.go index de06d16d2..91fc5fce7 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/detach/detach.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/detach/detach.go @@ -7,6 +7,7 @@ import ( jsonpatch "github.com/evanphx/json-patch" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -17,29 +18,59 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/discovery" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/kuberesource" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/archive" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/filesystem" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/kube" ) // detach order, crd is detached first var defaultDetachPriorities = []schema.GroupResource{ - kuberesource.CustomResourceDefinitions, kuberesource.StatefulSets, kuberesource.Deployments, kuberesource.ReplicaSets, + kuberesource.Services, kuberesource.PersistentVolumeClaims, + kuberesource.PersistentVolumes, + kuberesource.ServiceAccounts, + kuberesource.Configmaps, + kuberesource.Secrets, + kuberesource.Roles, + kuberesource.RoleBindings, kuberesource.Pods, } +var defaultUndetachPriorities = []schema.GroupResource{ + kuberesource.Pods, + kuberesource.RoleBindings, + kuberesource.Roles, + kuberesource.Configmaps, + kuberesource.Secrets, + kuberesource.ServiceAccounts, + kuberesource.PersistentVolumes, + kuberesource.PersistentVolumeClaims, + kuberesource.Services, + kuberesource.ReplicaSets, + kuberesource.Deployments, + kuberesource.StatefulSets, +} + type kubernetesDetacher struct { - request *requests.PromoteRequest - discoveryHelper discovery.Helper - dynamicFactory client.DynamicFactory // used for connect leaf cluster - fileSystem filesystem.Interface - backupReader io.Reader - resourceClients map[resourceClientKey]client.Dynamic - detachDir string - actions map[string]DetachItemAction + request *requests.PromoteRequest + discoveryHelper discovery.Helper + dynamicFactory client.DynamicFactory // used for connect leaf cluster + fileSystem filesystem.Interface + backupReader io.Reader + resourceClients map[resourceClientKey]client.Dynamic + detachDir string + actions map[string]DetachItemAction + kosmosClusterName string + ownerItems map[ownerReferenceKey]struct{} +} + +type ownerReferenceKey struct { + apiVersion string + kind string } func NewKubernetesDetacher(request *requests.PromoteRequest, backupReader io.Reader) (*kubernetesDetacher, error) { @@ -54,13 +85,14 @@ func NewKubernetesDetacher(request *requests.PromoteRequest, backupReader io.Rea } return &kubernetesDetacher{ - request: request, - discoveryHelper: discoveryHelper, - dynamicFactory: dynamicFactory, - fileSystem: filesystem.NewFileSystem(), - backupReader: backupReader, - resourceClients: make(map[resourceClientKey]client.Dynamic), - actions: actions, + request: request, + discoveryHelper: discoveryHelper, + dynamicFactory: dynamicFactory, + fileSystem: filesystem.NewFileSystem(), + backupReader: backupReader, + resourceClients: make(map[resourceClientKey]client.Dynamic), + actions: actions, + kosmosClusterName: request.Spec.ClusterName, }, nil } @@ -101,6 +133,7 @@ func (d *kubernetesDetacher) Detach() error { // Need to set this for additionalItems to be restored. d.detachDir = dir + d.ownerItems = map[ownerReferenceKey]struct{}{} backupResources, err := archive.NewParser(d.fileSystem).Parse(d.detachDir) if err != nil { @@ -138,12 +171,12 @@ func (d *kubernetesDetacher) processSelectedResource(selectedResource detachable return errors.Wrap(err, "detachItem error") } - key := requests.ItemKey{ + item := types.ItemKey{ Resource: groupResource.String(), - Namespace: obj.GetNamespace(), - Name: obj.GetName(), + Name: selectedItem.name, + Namespace: selectedItem.targetNamespace, } - d.request.DetachedItems[key] = struct{}{} + d.request.DetachedItems[item] = struct{}{} } } return nil @@ -155,53 +188,137 @@ func (d *kubernetesDetacher) detachItem(obj *unstructured.Unstructured, groupRes return errors.Wrap(err, "getResourceClient error") } - if groupResource == kuberesource.StatefulSets || groupResource == kuberesource.Deployments || groupResource == kuberesource.ReplicaSets { - //级联删除sts、deployment、replicaset等 - orphanOption := metav1.DeletePropagationOrphan - if err = resourceClient.Delete(obj.GetName(), metav1.DeleteOptions{PropagationPolicy: &orphanOption}); err != nil { - return errors.Wrap(err, "DeletePropagationOrphan err") - } - } else if action, ok := d.actions[groupResource.String()]; ok { - err := action.Execute(obj, resourceClient) + klog.Infof("detach resource %s, name: %s, namespace: %s", groupResource.String(), obj.GetName(), obj.GetNamespace()) + + if action, ok := d.actions[groupResource.String()]; ok { + err := action.Execute(obj, resourceClient, d) if err != nil { return errors.Errorf("%s detach action error: %v", groupResource.String(), err) } + return nil } else { - // todo check if the gr provided was a custom resource - customResource := false - if customResource { - updatedObj := obj.DeepCopy() - res, ok := updatedObj.Object["metadata"] - if !ok { - return errors.New("metadata not found") + klog.Infof("no action found for resource %s, delete it", groupResource.String()) + updatedOwnerObj := obj.DeepCopy() + if updatedOwnerObj.GetFinalizers() != nil { + updatedOwnerObj.SetFinalizers(nil) + patchBytes, err := generatePatch(obj, updatedOwnerObj) + if err != nil { + return errors.Wrap(err, "error generating patch") } - metadata, ok := res.(map[string]interface{}) - if !ok { - return errors.Errorf("metadata was of type %T, expected map[string]interface{}", res) + + _, err = resourceClient.Patch(updatedOwnerObj.GetName(), patchBytes) + if err != nil { + return errors.Wrapf(err, "error patch %s %s", groupResource.String(), updatedOwnerObj.GetName()) } + } - if _, ok := metadata["finalizers"]; ok { - delete(metadata, "finalizers") - patchBytes, err := generatePatch(obj, updatedObj) - if err != nil { - return errors.Wrap(err, "error generating patch") - } - if patchBytes == nil { - klog.Warningf("the same crd obj, %s", updatedObj.GetName()) + deleteGraceSeconds := int64(0) + err = resourceClient.Delete(updatedOwnerObj.GetName(), metav1.DeleteOptions{GracePeriodSeconds: &deleteGraceSeconds}) + if err != nil { + return errors.Wrapf(err, "error delete %s %s", groupResource.String(), updatedOwnerObj.GetName()) + } + } + return nil +} + +func (d *kubernetesDetacher) Rollback(allDetached bool) error { + dir, err := archive.NewExtractor(d.fileSystem).UnzipAndExtractBackup(d.backupReader) + if err != nil { + return errors.Errorf("error unzipping and extracting: %v", err) + } + defer func() { + if err := d.fileSystem.RemoveAll(dir); err != nil { + klog.Errorf("error removing temporary directory %s: %s", dir, err.Error()) + } + }() + + d.detachDir = dir + d.ownerItems = map[ownerReferenceKey]struct{}{} + backupedResources, err := archive.NewParser(d.fileSystem).Parse(d.detachDir) + if err != nil { + return errors.Errorf("error parse detachDir %s: %v", d.detachDir, err) + } + klog.Infof("total backup resources size: %v", len(backupedResources)) + + resourceCollection, err := d.getOrderedResourceCollection(backupedResources, defaultUndetachPriorities) + if err != nil { + return err + } + + for _, selectedResource := range resourceCollection { + err = d.rollbackSelectedResource(selectedResource, allDetached) + if err != nil { + return err + } + } + + return nil +} + +func (d *kubernetesDetacher) rollbackSelectedResource(selectedResource detachableResource, allDetached bool) error { + groupResource := schema.ParseGroupResource(selectedResource.resource) + + for _, selectedItems := range selectedResource.selectedItemsByNamespace { + for _, selectedItem := range selectedItems { + if !allDetached { + item := types.ItemKey{ + Resource: groupResource.String(), + Name: selectedItem.name, + Namespace: selectedItem.targetNamespace, } - klog.Infof("delete finalizers for %s", updatedObj.GetName()) - _, err = resourceClient.Patch(updatedObj.GetName(), patchBytes) - if err != nil { - return err + if _, ok := d.request.DetachedItems[item]; !ok { + // undetached resource, doesn't need to handle + continue } + } - klog.Infof("delete cr %s", updatedObj.GetName()) - return resourceClient.Delete(updatedObj.GetName(), metav1.DeleteOptions{}) + obj, err := archive.Unmarshal(d.fileSystem, selectedItem.path) + if err != nil { + return errors.Errorf("error decoding %q: %v", strings.Replace(selectedItem.path, d.detachDir+"/", "", -1), err) + } + + err = d.undetachItem(obj, groupResource, selectedItem.targetNamespace) + if err != nil { + return errors.Wrap(err, "UndetachItem error") } } } + return nil +} + +func (d *kubernetesDetacher) undetachItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) error { + resourceClient, err := d.getResourceClient(groupResource, obj, namespace) + if err != nil { + return errors.Wrap(err, "getResourceClient error") + } + klog.Infof("Undetach resource %s, name: %s", groupResource.String(), obj.GetName()) + + if action, ok := d.actions[groupResource.String()]; ok { + err = action.Revert(obj, resourceClient, d) + if err != nil { + return errors.Errorf("%s Undetach action error: %v", groupResource.String(), err) + } + + return nil + } else { + klog.Infof("no action found for resource %s, create it immediately", groupResource.String()) + newObj := obj.DeepCopy() + newObj, err := kube.ResetMetadataAndStatus(newObj) + if err != nil { + return errors.Wrapf(err, "reset %s %s metadata error", obj.GroupVersionKind().String(), obj.GetName()) + } + + _, err = resourceClient.Create(newObj) + if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Infof("resource %s is already exist. skip create", newObj.GetName()) + return nil + } + return errors.Wrap(err, "create resource "+newObj.GetName()+" failed.") + } + } return nil } @@ -237,6 +354,31 @@ func (d *kubernetesDetacher) getOrderedResourceCollection( detachResourceCollection = append(detachResourceCollection, res) } } + + for owner := range d.ownerItems { + klog.Infof("ownerReference: %s %s", owner.apiVersion, owner.kind) + gvk := schema.FromAPIVersionAndKind(owner.apiVersion, owner.kind) + gvr, _, err := d.discoveryHelper.KindFor(gvk) + if err != nil { + return nil, errors.Wrapf(err, "resource %s cannot be resolved via discovery", gvk.String()) + } + + resourceList := backupResources[gvr.GroupResource().String()] + if resourceList == nil { + klog.Infof("Skipping restore of resource %s because it's not present in the backup tarball", gvr.GroupResource().String()) + continue + } + + for namespace, items := range resourceList.ItemsByNamespace { + res, err := d.getSelectedDetachableItems(gvr.GroupResource().String(), namespace, items) + if err != nil { + return nil, err + } + + detachResourceCollection = append(detachResourceCollection, res) + } + } + return detachResourceCollection, nil } @@ -280,6 +422,17 @@ func (d *kubernetesDetacher) getSelectedDetachableItems(resource string, namespa detachable.selectedItemsByNamespace[namespace] = append(detachable.selectedItemsByNamespace[namespace], selectedItem) detachable.totalItems++ + + if resource == kuberesource.StatefulSets.String() || resource == kuberesource.Deployments.String() { + for _, owner := range obj.GetOwnerReferences() { + ownerKey := ownerReferenceKey{ + apiVersion: owner.APIVersion, + kind: owner.Kind, + } + + d.ownerItems[ownerKey] = struct{}{} + } + } } return detachable, nil } diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/pod_detach_action.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/pod_detach_action.go index bb16ff0f9..9a75ff698 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/detach/pod_detach_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/detach/pod_detach_action.go @@ -3,6 +3,8 @@ package detach import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" @@ -18,11 +20,11 @@ func NewPodAction() *PodAction { return &PodAction{} } -func (p *PodAction) Resource() string { - return "pods" +func (p *PodAction) Resource() []string { + return []string{"pods"} } -func (p *PodAction) Execute(obj *unstructured.Unstructured, client client.Dynamic) error { +func (p *PodAction) Execute(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { updatedPod := new(corev1.Pod) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, updatedPod); err != nil { return err @@ -40,16 +42,57 @@ func (p *PodAction) Execute(obj *unstructured.Unstructured, client client.Dynami if err != nil { return errors.Wrap(err, "unable to convert pod to unstructured item") } - patchBytes, err := generatePatch(obj, &unstructured.Unstructured{Object: podMap}) if err != nil { return errors.Wrap(err, "error generating patch") } if patchBytes == nil { klog.Warningf("the same pod obj, %s", updatedPod.Name) + return nil } _, err = client.Patch(updatedPod.Name, patchBytes) return err } } + +func (p *PodAction) Revert(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { + fromCluster, err := client.Get(obj.GetName(), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("resource %s %s not found. skip undetach", obj.GroupVersionKind().String(), obj.GetName()) + return nil + } else { + return errors.Wrapf(err, "get resource %s %s failed.", obj.GroupVersionKind().String(), obj.GetName()) + } + } + + updatedPod := new(corev1.Pod) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(fromCluster.Object, updatedPod); err != nil { + return err + } + labels := updatedPod.GetLabels() + if labels != nil { + if _, ok := labels["kosmos-io/pod"]; ok { + delete(labels, "kosmos-io/pod") + updatedPod.SetLabels(labels) + podMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedPod) + if err != nil { + return errors.Wrap(err, "unable to convert pod to unstructured item") + } + patchBytes, err := generatePatch(fromCluster, &unstructured.Unstructured{Object: podMap}) + if err != nil { + return errors.Wrap(err, "error generating patch") + } + if patchBytes == nil { + klog.Warningf("the same pod obj, %s", updatedPod.Name) + return nil + } + + _, err = client.Patch(updatedPod.Name, patchBytes) + return err + } + } + + return nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/pvc_detach_action.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/pvc_detach_action.go deleted file mode 100644 index b383eec49..000000000 --- a/pkg/clustertree/cluster-manager/controllers/promote/detach/pvc_detach_action.go +++ /dev/null @@ -1,57 +0,0 @@ -package detach - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - - "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/client" -) - -// SecretAction is a restore item action for secrets -type PvcAction struct { - logger logrus.FieldLogger -} - -func NewPvcAction() *PvcAction { - return &PvcAction{} -} - -func (p *PvcAction) Resource() string { - return "persistentvolumeclaims" -} - -func (p *PvcAction) Execute(obj *unstructured.Unstructured, client client.Dynamic) error { - updatedPvc := new(corev1.PersistentVolumeClaim) - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, updatedPvc); err != nil { - return err - } - - annotations := updatedPvc.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - if _, ok := annotations["kosmos.io/global"]; ok { - return nil - } else { - annotations["kosmos.io/global"] = "true" - - pvcMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedPvc) - if err != nil { - return errors.Wrap(err, "unable to convert pvc to unstructured item") - } - - patchBytes, err := generatePatch(obj, &unstructured.Unstructured{Object: pvcMap}) - if err != nil { - return errors.Wrap(err, "error generating patch") - } - if patchBytes == nil { - p.logger.Warnf("the same pvc obj, %s", updatedPvc.Name) - } - - _, err = client.Patch(updatedPvc.Name, patchBytes) - return err - } -} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/register_action.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/register_action.go index 1b6ca7f47..e3e29400c 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/detach/register_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/detach/register_action.go @@ -5,19 +5,21 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/client" - "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" ) // BackupItemAction is an actor that performs an operation on an individual item being backed up. type DetachItemAction interface { // return resource.group - Resource() string + Resource() []string // Execute allows the ItemAction to perform arbitrary logic with the item being backed up, // including mutating the item itself prior to backup. The item (unmodified or modified) // should be returned, along with an optional slice of ResourceIdentifiers specifying // additional related items that should be backed up. - Execute(obj *unstructured.Unstructured, client client.Dynamic) error + Execute(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error + + Revert(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error } func registerDetachActions() (map[string]DetachItemAction, error) { @@ -26,13 +28,16 @@ func registerDetachActions() (map[string]DetachItemAction, error) { if err := registerDetachItemAction(actionMap, newPodDetachItemAction); err != nil { return nil, err } - if err := registerDetachItemAction(actionMap, newPvcDetachItemAction); err != nil { + if err := registerDetachItemAction(actionMap, newUniversalDetachItemAction); err != nil { + return nil, err + } + if err := registerDetachItemAction(actionMap, newStsDeployDetachItemAction); err != nil { return nil, err } return actionMap, nil } -func registerDetachItemAction(actionsMap map[string]DetachItemAction, initializer requests.HandlerInitializer) error { +func registerDetachItemAction(actionsMap map[string]DetachItemAction, initializer types.HandlerInitializer) error { instance, err := initializer() if err != nil { return errors.WithMessage(err, "init restore action instance error") @@ -42,7 +47,9 @@ func registerDetachItemAction(actionsMap map[string]DetachItemAction, initialize if !ok { return errors.Errorf("%T is not a detach item action", instance) } - actionsMap[itemAction.Resource()] = itemAction + for _, resource := range itemAction.Resource() { + actionsMap[resource] = itemAction + } return nil } @@ -50,6 +57,10 @@ func newPodDetachItemAction() (interface{}, error) { return NewPodAction(), nil } -func newPvcDetachItemAction() (interface{}, error) { - return NewPvcAction(), nil +func newUniversalDetachItemAction() (interface{}, error) { + return NewUniversalAction(), nil +} + +func newStsDeployDetachItemAction() (interface{}, error) { + return NewStsDeployAction(), nil } diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/sts_deploy_detach_action.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/sts_deploy_detach_action.go new file mode 100644 index 000000000..d22b68c79 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/detach/sts_deploy_detach_action.go @@ -0,0 +1,59 @@ +package detach + +import ( + "time" + + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" + + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/client" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/kube" +) + +type StsDeployAction struct { +} + +func NewStsDeployAction() *StsDeployAction { + return &StsDeployAction{} +} + +func (p *StsDeployAction) Resource() []string { + return []string{"statefulsets.apps", "deployments.apps", "replicasets.apps"} +} + +func (p *StsDeployAction) Execute(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { + //级联删除sts、deployment、replicaset等 + orphanOption := metav1.DeletePropagationOrphan + err := client.Delete(obj.GetName(), metav1.DeleteOptions{PropagationPolicy: &orphanOption}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("resource %s not found, skip delete", obj.GetName()) + return nil + } else { + return errors.Wrap(err, "DeletePropagationOrphan err") + } + } + return nil +} + +func (p *StsDeployAction) Revert(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { + newObj := obj.DeepCopy() + newObj, err := kube.ResetMetadataAndStatus(newObj) + if err != nil { + return errors.Wrapf(err, "reset %s %s metadata error", obj.GroupVersionKind().String(), obj.GetName()) + } + + _, err = client.Create(newObj) + if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Infof("resource %s is already exist. skip create", newObj.GetName()) + return nil + } + return errors.Wrap(err, "create resource "+newObj.GetName()+" failed.") + } + time.Sleep(5 * time.Second) + return nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/detach/universal_detach_action.go b/pkg/clustertree/cluster-manager/controllers/promote/detach/universal_detach_action.go new file mode 100644 index 000000000..a79ed0728 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/detach/universal_detach_action.go @@ -0,0 +1,112 @@ +package detach + +import ( + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/client" +) + +type UniversalAction struct { +} + +func NewUniversalAction() *UniversalAction { + return &UniversalAction{} +} + +func (p *UniversalAction) Resource() []string { + return []string{"services", "persistentvolumeclaims", "persistentvolumes", "configmaps", "secrets", "serviceaccounts", + "roles.rbac.authorization.k8s.io", "rolebindings.rbac.authorization.k8s.io"} +} + +func (p *UniversalAction) Execute(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { + updatedObj := obj.DeepCopy() + objectMeta, err := meta.Accessor(updatedObj) + if err != nil { + return errors.WithStack(err) + } + + annotations := objectMeta.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + var key, val string + if obj.GetKind() == "Service" { + key = "kosmos.io/auto-create-mcs" + val = "true" + } else { + key = "kosmos-io/cluster-owners" + val = detacher.kosmosClusterName + } + + _, ok := annotations[key] + if updatedObj.GetOwnerReferences() != nil || !ok { + annotations[key] = val + updatedObj.SetAnnotations(annotations) + updatedObj.SetOwnerReferences(nil) + patchBytes, err := generatePatch(obj, updatedObj) + if err != nil { + return errors.Wrap(err, "error generating patch") + } + if patchBytes == nil { + klog.Warningf("the same obj, %s", objectMeta.GetName()) + } + + _, err = client.Patch(objectMeta.GetName(), patchBytes) + return err + } + + return nil +} + +//nolint:gosec // No need to check. +func (p *UniversalAction) Revert(obj *unstructured.Unstructured, client client.Dynamic, detacher *kubernetesDetacher) error { + fromCluster, err := client.Get(obj.GetName(), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("resource %s not found. skip undetach", obj.GroupVersionKind().String(), obj.GetName()) + return nil + } else { + return errors.Wrapf(err, "get resource %s %s failed.", obj.GroupVersionKind().String(), obj.GetName()) + } + } + + updatedObj := fromCluster.DeepCopy() + objectMeta, err := meta.Accessor(updatedObj) + if err != nil { + return errors.WithStack(err) + } + + annotations := objectMeta.GetAnnotations() + if annotations != nil { + var key string + if obj.GetKind() == "Service" { + key = "kosmos.io/auto-create-mcs" + } else { + key = "kosmos-io/cluster-owners" + } + + if _, ok := annotations[key]; ok { + delete(annotations, key) + updatedObj.SetAnnotations(annotations) + patchBytes, err := generatePatch(fromCluster, updatedObj) + if err != nil { + return errors.Wrap(err, "error generating patch") + } + if patchBytes == nil { + klog.Warningf("the same obj, %s", objectMeta.GetName()) + return nil + } + + _, err = client.Patch(objectMeta.GetName(), patchBytes) + return err + } + } + + return nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/discovery/helper.go b/pkg/clustertree/cluster-manager/controllers/promote/discovery/helper.go index d49bd3f15..0ecb6cba5 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/discovery/helper.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/discovery/helper.go @@ -180,6 +180,8 @@ func (h *helper) Refresh() error { for _, resource := range resourceGroup.APIResources { gvr := gv.WithResource(resource.Name) gvk := gv.WithKind(resource.Kind) + resource.Group = gv.Group + resource.Version = gv.Version h.resourcesMap[gvr] = resource h.kindMap[gvk] = resource } diff --git a/pkg/clustertree/cluster-manager/controllers/promote/kuberesource/kuberesource.go b/pkg/clustertree/cluster-manager/controllers/promote/kuberesource/kuberesource.go index f8ed3d60a..a4f065697 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/kuberesource/kuberesource.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/kuberesource/kuberesource.go @@ -23,17 +23,21 @@ import ( var ( ClusterRoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings"} ClusterRoles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterroles"} + RoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "rolebindings"} + Roles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "roles"} CustomResourceDefinitions = schema.GroupResource{Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"} Jobs = schema.GroupResource{Group: "batch", Resource: "jobs"} Namespaces = schema.GroupResource{Group: "", Resource: "namespaces"} PersistentVolumeClaims = schema.GroupResource{Group: "", Resource: "persistentvolumeclaims"} PersistentVolumes = schema.GroupResource{Group: "", Resource: "persistentvolumes"} Pods = schema.GroupResource{Group: "", Resource: "pods"} + Configmaps = schema.GroupResource{Group: "", Resource: "configmaps"} ServiceAccounts = schema.GroupResource{Group: "", Resource: "serviceaccounts"} Secrets = schema.GroupResource{Group: "", Resource: "secrets"} StatefulSets = schema.GroupResource{Group: "apps", Resource: "statefulsets"} Deployments = schema.GroupResource{Group: "apps", Resource: "deployments"} ReplicaSets = schema.GroupResource{Group: "apps", Resource: "replicasets"} + Services = schema.GroupResource{Group: "", Resource: "services"} VolumeSnapshotClasses = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshotclasses"} VolumeSnapshots = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshots"} VolumeSnapshotContents = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshotcontents"} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go b/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go index 37e6d4545..2585bd3d6 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go @@ -24,6 +24,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/precheck" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/restore" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/collections" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" ) @@ -52,84 +53,157 @@ func (p *PromotePolicyController) Reconcile(ctx context.Context, request reconci original := &v1alpha1.PromotePolicy{} if err := p.RootClient.Get(ctx, request.NamespacedName, original); err != nil { if apierrors.IsNotFound(err) { - klog.Infof("syncleaf %s not found", original.Name) + klog.Infof("promotepolicy %s not found", original.Name) return ctrl.Result{}, nil } - klog.Errorf("error getting syncleaf %s: %v", original.Name, err) + klog.Errorf("error getting promotepolicy %s: %v", original.Name, err) return ctrl.Result{}, nil } - switch original.Status.Phase { - case "": - // only process new backups - default: - klog.Infof("syncleaf %s is not handled", original.Name) - return ctrl.Result{}, nil - } - - lr, err := p.GlobalLeafManager.GetLeafResourceByNodeName(original.Spec.ClusterName) + lr, err := p.GlobalLeafManager.GetLeafResourceByNodeName("kosmos-" + original.Spec.ClusterName) if err != nil { // wait for leaf resource init - klog.Errorf("Error get leaf %s resource. %v", original.Spec.ClusterName, err) + klog.Errorf("Error get kosmos leaf %s resource. %v", original.Spec.ClusterName, err) return reconcile.Result{RequeueAfter: RequeueTime}, nil } promoteRequest, err := p.preparePromoteRequest(original, lr) if err != nil { - return reconcile.Result{}, fmt.Errorf("error prepareSyncRequest: %v", err) + return reconcile.Result{}, fmt.Errorf("error prepare promoteRequest: %v", err) + } + + switch original.Status.Phase { + case "": + //create promotepolicy request + case v1alpha1.PromotePolicyPhaseCompleted, v1alpha1.PromotePolicyPhaseFailedRollback: + // check if Rollback request + if original.Spec.Rollback == "true" { + klog.Info("rollback start...") + promoteRequest.Spec.Rollback = "" + err = DetachRollback(promoteRequest, original.Status.BackedupFile, true) + if err != nil { + klog.Errorf("rollback detached resources err: %s", err.Error()) + promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedRollback + promoteRequest.Status.FailureReason = err.Error() + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } + return ctrl.Result{}, nil + } + + err = RestoreRollback(promoteRequest, original.Status.BackedupFile, true) + if err != nil { + klog.Errorf("rollback restored resources err: %s", err.Error()) + promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedRollback + promoteRequest.Status.FailureReason = err.Error() + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } + return ctrl.Result{}, nil + } + + promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseRolledback + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } + } + + return ctrl.Result{}, nil + default: + klog.Infof("promotePolicy %s status %s will not handled", original.Name, original.Status.Phase) + return ctrl.Result{}, nil } err = runPrecheck(promoteRequest) if err != nil { + klog.Errorf("precheck err: %s", err.Error()) promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedPrecheck promoteRequest.Status.FailureReason = err.Error() - if err := p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)); err != nil { - klog.Errorf("error updating syncleaf %s final status", original.Name) + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) } return reconcile.Result{}, err } backupFile, err := runBackup(promoteRequest) if err != nil { + klog.Errorf("backup resources err: %s", err.Error()) promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedBackup promoteRequest.Status.FailureReason = err.Error() - if err := p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)); err != nil { - klog.Errorf("error updating syncleaf %s final status", original.Name) + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) } return reconcile.Result{}, err } + klog.Infof("backup success. file: %s", backupFile) + + promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseDetach + promoteRequest.Status.BackedupFile = backupFile + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } err = runDetach(promoteRequest, backupFile) if err != nil { + klog.Errorf("detach resources err: %s", err.Error()) promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedDetach promoteRequest.Status.FailureReason = err.Error() - err := p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)) + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } + + klog.Warning("Begin rollback detached resources because detach stage failed.") + time.Sleep(5 * time.Second) + err = DetachRollback(promoteRequest, backupFile, false) if err != nil { - klog.Errorf("error updating syncleaf %s final status", original.Name) + klog.Errorf("rollback detached resource err: %s", err.Error()) + } else { + klog.Info("all detached resource rollback suceess.") } return reconcile.Result{}, err } err = runRestore(promoteRequest, backupFile) if err != nil { + klog.Errorf("restore resources err: %s", err.Error()) promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedRestore promoteRequest.Status.FailureReason = err.Error() - err := p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)) + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) + } + + klog.Warning("Begin rollback detached and restored resources because restore stage failed.") + time.Sleep(5 * time.Second) + err = DetachRollback(promoteRequest, backupFile, true) + if err != nil { + klog.Errorf("rollback detached resource err: %s", err.Error()) + } else { + klog.Info("all detached resource rollback suceess.") + } + + err = RestoreRollback(promoteRequest, backupFile, false) if err != nil { - klog.Errorf("error updating syncleaf %s final status", original.Name) + klog.Errorf("rollback restored resource err: %s", err.Error()) + } else { + klog.Info("all restored resource rollback suceess.") } return reconcile.Result{}, err } promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseCompleted - err = p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)) - if err != nil { - klog.Errorf("error updating syncleaf %s final status", original.Name) + if err = p.updateStatus(original, promoteRequest.PromotePolicy); err != nil { + return reconcile.Result{}, errors.Wrapf(err, "error updating promotepolicy %s to status %s", original.Name, promoteRequest.Status.Phase) } + klog.Infof("Create promotePolicy %s completed", original.Name) + return reconcile.Result{}, nil } +func (p *PromotePolicyController) updateStatus(original *v1alpha1.PromotePolicy, updatedObj *v1alpha1.PromotePolicy) error { + return p.RootClient.Patch(context.TODO(), updatedObj, client.MergeFrom(original)) +} + func (p *PromotePolicyController) preparePromoteRequest(promote *v1alpha1.PromotePolicy, lf *leafUtils.LeafResource) (*requests.PromoteRequest, error) { // todo validate params @@ -142,9 +216,9 @@ func (p *PromotePolicyController) preparePromoteRequest(promote *v1alpha1.Promot LeafDynamicClient: lf.DynamicClient, LeafDiscoveryClient: lf.DiscoveryClient, NamespaceIncludesExcludes: collections.NewIncludesExcludes().Includes(promote.Spec.IncludedNamespaces...).Excludes(promote.Spec.ExcludedNamespaces...), - BackedUpItems: make(map[requests.ItemKey]struct{}), - DetachedItems: make(map[requests.ItemKey]struct{}), - RestoredItems: make(map[requests.ItemKey]struct{}), + BackedUpItems: make(map[types.ItemKey]struct{}), + DetachedItems: make(map[types.ItemKey]struct{}), + RestoredItems: make(map[types.ItemKey]types.RestoredItemStatus), ForbidNamespaces: p.PromotePolicyOptions.ForbidNamespaces, } return request, nil @@ -166,8 +240,8 @@ func runPrecheck(promoteRequest *requests.PromoteRequest) error { } func runBackup(promoteRequest *requests.PromoteRequest) (file string, err error) { - klog.Info("Setting up backup temp file") - filePath := constants.BackupDir + time.Now().Format("20060102-150405") + klog.Info("start backup resources") + filePath := constants.BackupDir + promoteRequest.Name + time.Now().Format("20060102-150405") backupFile, err := os.Create(filePath) if err != nil { return "", errors.Wrap(err, "error creating temp file for backup") @@ -208,8 +282,44 @@ func runDetach(promoteRequest *requests.PromoteRequest, backupfile string) error return nil } +func DetachRollback(promoteRequest *requests.PromoteRequest, backupfile string, detachSuccess bool) error { + backupReader, err := os.Open(backupfile) + if err != nil { + panic(err) + } + defer backupReader.Close() + + detacher, err := detach.NewKubernetesDetacher(promoteRequest, backupReader) + if err != nil { + return errors.Wrap(err, "error new detach instance") + } + + err = detacher.Rollback(detachSuccess) + if err != nil { + return errors.Wrap(err, "error detach") + } + return nil +} + +func RestoreRollback(promoteRequest *requests.PromoteRequest, backupfile string, restoreSuccess bool) error { + backupReader, err := os.Open(backupfile) + if err != nil { + panic(err) + } + defer backupReader.Close() + + restorer, err := restore.NewKubernetesRestorer(promoteRequest, backupReader) + if err != nil { + return errors.Wrap(err, "error new restore instance") + } + err = restorer.Rollback(restoreSuccess) + if err != nil { + return errors.Wrap(err, "error restore") + } + return nil +} + func runRestore(promoteRequest *requests.PromoteRequest, backupfile string) error { - // 打开压缩文件 backupReader, err := os.Open(backupfile) if err != nil { panic(err) diff --git a/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go b/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go index 04c512c50..9e2fbf8b5 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go @@ -30,18 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/collections" ) -// HandlerInitializer is a function that initializes and returns a new instance of one of action interfaces -type HandlerInitializer func() (interface{}, error) - -type ItemKey struct { - Resource string - Namespace string - Name string -} - type PromoteRequest struct { *v1alpha1.PromotePolicy RootClient client.Client @@ -55,9 +47,9 @@ type PromoteRequest struct { ResourceIncludesExcludes collections.IncludesExcludesInterface NamespaceIncludesExcludes *collections.IncludesExcludes - BackedUpItems map[ItemKey]struct{} - DetachedItems map[ItemKey]struct{} - RestoredItems map[ItemKey]struct{} + BackedUpItems map[types.ItemKey]struct{} + DetachedItems map[types.ItemKey]struct{} + RestoredItems map[types.ItemKey]types.RestoredItemStatus ForbidNamespaces []string } diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/pod_restore_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/pod_restore_action.go index 846f4e3c4..cb9b3b6dd 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/restore/pod_restore_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/pod_restore_action.go @@ -5,8 +5,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - - "github.com/kosmos.io/kosmos/pkg/utils" ) type PodAction struct { @@ -16,8 +14,8 @@ func NewPodAction() *PodAction { return &PodAction{} } -func (p *PodAction) Resource() string { - return "pods" +func (p *PodAction) Resource() []string { + return []string{"pods"} } func (p *PodAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { @@ -26,37 +24,38 @@ func (p *PodAction) Execute(obj *unstructured.Unstructured, restorer *kubernetes return nil, errors.Wrap(err, "unable to convert unstructured item to pod") } - updatedPod.Spec.NodeName = restorer.clusterNodeName - kosmosTolerationExist := false + updatedPod.Spec.NodeName = restorer.kosmosNodeName - labels := updatedPod.Labels - if labels == nil { - labels = make(map[string]string) + kosmosNodeToleration := corev1.Toleration{ + Key: "kosmos.io/node", + Value: "true", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, } - labels[utils.KosmosPodLabel] = "true" - labels["kosmos/promoted"] = "true" - tolerations := updatedPod.Spec.Tolerations if tolerations == nil { tolerations = make([]corev1.Toleration, 1) - } - - for _, toleration := range tolerations { - if toleration.Key == "kosmos.io/node" { - kosmosTolerationExist = true - break + tolerations[0] = kosmosNodeToleration + } else { + kosmosTolerationExist := false + for _, toleration := range tolerations { + if toleration.Key == "kosmos.io/node" { + kosmosTolerationExist = true + break + } + } + if !kosmosTolerationExist { + updatedPod.Spec.Tolerations = append(updatedPod.Spec.Tolerations, kosmosNodeToleration) } } - if !kosmosTolerationExist { - kosmosNodeToleration := corev1.Toleration{ - Key: "kosmos.io/node", - Value: "true", - Operator: corev1.TolerationOpEqual, - Effect: corev1.TaintEffectNoSchedule, - } - updatedPod.Spec.Tolerations = append(updatedPod.Spec.Tolerations, kosmosNodeToleration) + labels := updatedPod.GetLabels() + if labels == nil { + labels = make(map[string]string) } + labels["kosmos-io/pod"] = "true" + labels["kosmos-io/synced"] = "true" + updatedPod.SetLabels(labels) podMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedPod) if err != nil { @@ -64,3 +63,25 @@ func (p *PodAction) Execute(obj *unstructured.Unstructured, restorer *kubernetes } return &unstructured.Unstructured{Object: podMap}, nil } + +func (p *PodAction) Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedPod := new(corev1.Pod) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(fromCluster.Object, updatedPod); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to pod") + } + + labels := updatedPod.GetLabels() + if labels != nil { + if _, ok := labels["kosmos-io/pod"]; ok { + delete(labels, "kosmos-io/pod") + updatedPod.SetLabels(labels) + podMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedPod) + if err != nil { + return nil, errors.Wrap(err, "unable to convert pod to unstructured item") + } + return &unstructured.Unstructured{Object: podMap}, nil + } + } + + return fromCluster, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/pv_restore_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/pv_restore_action.go index 5545aebf0..adc5c231a 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/restore/pv_restore_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/pv_restore_action.go @@ -17,8 +17,8 @@ func NewPvAction() *PvAction { return &PvAction{} } -func (p *PvAction) Resource() string { - return "persistentvolumes" +func (p *PvAction) Resource() []string { + return []string{"persistentvolumes"} } func (p *PvAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { @@ -57,3 +57,25 @@ func (p *PvAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesR return obj, nil } } + +func (p *PvAction) Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedPv := new(corev1.PersistentVolume) + + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(fromCluster.UnstructuredContent(), updatedPv); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to pv") + } + + annotations := updatedPv.Annotations + if annotations != nil { + if _, ok := annotations["kosmos-io/cluster-owners"]; ok { + delete(annotations, "kosmos-io/cluster-owners") + pvMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedPv) + if err != nil { + return nil, errors.Wrap(err, "unable to convert pod to unstructured item") + } + return &unstructured.Unstructured{Object: pvMap}, nil + } + } + + return fromCluster, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/register_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/register_action.go index a913ffd3f..5a04125cb 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/restore/register_action.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/register_action.go @@ -4,18 +4,20 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" ) type RestoreItemAction interface { // return resource.group - Resource() string + Resource() []string // Execute allows the ItemAction to perform arbitrary logic with the item being backed up, // including mutating the item itself prior to backup. The item (unmodified or modified) // should be returned, along with an optional slice of ResourceIdentifiers specifying // additional related items that should be backed up. Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) + + Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) } func registerRestoreActions() (map[string]RestoreItemAction, error) { @@ -31,10 +33,25 @@ func registerRestoreActions() (map[string]RestoreItemAction, error) { return nil, errors.WithMessage(err, "register pv restore action error") } + err = registerRestoreItemAction(actionMap, newStsDeployRestoreItemAction) + if err != nil { + return nil, errors.WithMessage(err, "register sts/deploy restore action error") + } + + err = registerRestoreItemAction(actionMap, newServiceRestoreItemAction) + if err != nil { + return nil, errors.WithMessage(err, "register service restore action error") + } + + err = registerRestoreItemAction(actionMap, newUniversalRestoreItemAction) + if err != nil { + return nil, errors.WithMessage(err, "register universal restore action error") + } + return actionMap, nil } -func registerRestoreItemAction(actionsMap map[string]RestoreItemAction, initializer requests.HandlerInitializer) error { +func registerRestoreItemAction(actionsMap map[string]RestoreItemAction, initializer types.HandlerInitializer) error { instance, err := initializer() if err != nil { return errors.WithMessage(err, "init restore action instance error") @@ -44,7 +61,9 @@ func registerRestoreItemAction(actionsMap map[string]RestoreItemAction, initiali if !ok { return errors.Errorf("%T is not a backup item action", instance) } - actionsMap[itemAction.Resource()] = itemAction + for _, resource := range itemAction.Resource() { + actionsMap[resource] = itemAction + } return nil } @@ -55,3 +74,15 @@ func newPodRestoreItemAction() (interface{}, error) { func newPvRestoreItemAction() (interface{}, error) { return NewPvAction(), nil } + +func newStsDeployRestoreItemAction() (interface{}, error) { + return NewStsDeployAction(), nil +} + +func newServiceRestoreItemAction() (interface{}, error) { + return NewServiceAction(), nil +} + +func newUniversalRestoreItemAction() (interface{}, error) { + return NewUniversalAction(), nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/restore.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/restore.go index 7a6fd762b..25cf00c57 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/restore/restore.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/restore.go @@ -13,7 +13,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" @@ -25,6 +24,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/discovery" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/kuberesource" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/types" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/archive" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/filesystem" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/kube" @@ -51,17 +51,8 @@ High priorities: have pod volume restores run before controllers adopt the pods. - Replica sets go before deployments/other controllers so they can be explicitly restored and be adopted by controllers. - - CAPI ClusterClasses go before Clusters. - - Endpoints go before Services so no new Endpoints will be created - Services go before Clusters so they can be adopted by AKO-operator and no new Services will be created for the same clusters - -Low priorities: - - Tanzu ClusterBootstraps go last as it can reference any other kind of resources. - - ClusterBootstraps go before CAPI Clusters otherwise a new default ClusterBootstrap object is created for the cluster - - CAPI Clusters come before ClusterResourceSets because failing to do so means the CAPI controller-manager will panic. - Both Clusters and ClusterResourceSets need to come before ClusterResourceSetBinding in order to properly restore workload clusters. - See https://github.com/kubernetes-sigs/cluster-api/issues/4105 */ var defaultRestorePriorities = Priorities{ HighPriorities: []string{ @@ -70,17 +61,14 @@ var defaultRestorePriorities = Priorities{ "persistentvolumeclaims", "persistentvolumes", "serviceaccounts", + "roles.rbac.authorization.k8s.io", + "rolebindings.rbac.authorization.k8s.io", "secrets", "configmaps", - "limitranges", "pods", - // we fully qualify replicasets.apps because prior to Kubernetes 1.16, replicasets also - // existed in the extensions API group, but we back up replicasets from "apps" so we want - // to ensure that we prioritize restoring from "apps" too, since this is how they're stored - // in the backup. "replicasets.apps", "deployments.apps", - //"endpoints", + "statefulsets.apps", "services", }, LowPriorities: []string{}, @@ -88,18 +76,18 @@ var defaultRestorePriorities = Priorities{ // kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster. type kubernetesRestorer struct { + request *requests.PromoteRequest discoveryHelper discovery.Helper dynamicFactory client.DynamicFactory fileSystem filesystem.Interface restoreDir string actions map[string]RestoreItemAction namespaceClient corev1.NamespaceInterface - restoredItems map[requests.ItemKey]restoredItemStatus resourceClients map[resourceClientKey]client.Dynamic resourceTerminatingTimeout time.Duration - resourcePriorities Priorities backupReader io.Reader - clusterNodeName string + kosmosClusterName string + kosmosNodeName string } // restoreableResource represents map of individual items of each resource @@ -136,15 +124,15 @@ func NewKubernetesRestorer(request *requests.PromoteRequest, backupReader io.Rea return nil, err } return &kubernetesRestorer{ + request: request, discoveryHelper: discoveryHelper, dynamicFactory: dynamicFactory, namespaceClient: request.RootClientSet.CoreV1().Namespaces(), resourceTerminatingTimeout: 10 * time.Minute, fileSystem: filesystem.NewFileSystem(), - resourcePriorities: defaultRestorePriorities, backupReader: backupReader, - clusterNodeName: request.Spec.ClusterName, - restoredItems: make(map[requests.ItemKey]restoredItemStatus), + kosmosClusterName: request.Spec.ClusterName, + kosmosNodeName: "kosmos-" + request.Spec.ClusterName, resourceClients: make(map[resourceClientKey]client.Dynamic), actions: actions, }, nil @@ -183,35 +171,27 @@ func (kr *kubernetesRestorer) Restore() error { klog.Infof("total backup resources size: %v", len(backupResources)) // totalItems: previously discovered items, i: iteration counter. - totalItems, processedItems, existingNamespaces := 0, 0, sets.KeySet(make(map[string]struct{})) + processedItems, existingNamespaces := 0, sets.KeySet(make(map[string]struct{})) - // First restore CRDs. This is needed so that they are available in the cluster - // when getOrderedResourceCollection is called again on the whole backup and - // needs to validate all resources listed. - crdResourceCollection, processedResources, err := kr.getOrderedResourceCollection( + klog.Infof("Restore everything order by defaultRestorePriorities") + // Restore everything else + selectedResourceCollection, _, err := kr.getOrderedResourceCollection( backupResources, make([]restoreableResource, 0), sets.KeySet(make(map[string]string)), - Priorities{HighPriorities: []string{"customresourcedefinitions"}}, - false, + defaultRestorePriorities, + true, ) if err != nil { return errors.Wrap(err, "getOrderedResourceCollection err") } - klog.Infof("crdResourceCollection size: %s", len(crdResourceCollection)) + klog.Infof("resource collection size: %s", len(selectedResourceCollection)) - for _, selectedResource := range crdResourceCollection { - totalItems += selectedResource.totalItems - } - - for _, selectedResource := range crdResourceCollection { - // Restore this resource, the update channel is set to nil, to avoid misleading value of "totalItems" - // more details see #5990 - klog.Infof("restore source: %s", selectedResource.resource) + for _, selectedResource := range selectedResourceCollection { + // Restore this resource processedItems, err = kr.processSelectedResource( selectedResource, - totalItems, processedItems, existingNamespaces, ) @@ -220,32 +200,59 @@ func (kr *kubernetesRestorer) Restore() error { } } - klog.Infof("Restore everything else") + return nil +} + +func (kr *kubernetesRestorer) Rollback(allRestored bool) error { + dir, err := archive.NewExtractor(kr.fileSystem).UnzipAndExtractBackup(kr.backupReader) + if err != nil { + return errors.Errorf("error unzipping and extracting: %v", err) + } + defer func() { + if err := kr.fileSystem.RemoveAll(dir); err != nil { + klog.Errorf("error removing temporary directory %s: %s", dir, err.Error()) + } + }() + + // Need to set this for additionalItems to be restored. + kr.restoreDir = dir + + backupResources, err := archive.NewParser(kr.fileSystem).Parse(kr.restoreDir) + // If ErrNotExist occurs, it implies that the backup to be restored includes zero items. + // Need to add a warning about it and jump out of the function. + if errors.Cause(err) == archive.ErrNotExist { + return errors.Wrap(err, "zero items to be restored") + } + if err != nil { + return errors.Wrap(err, "error parsing backup contents") + } + + klog.Infof("total backup resources size: %v", len(backupResources)) + + var highProprites []string + highProprites = append(highProprites, defaultRestorePriorities.HighPriorities...) + reversSlice(highProprites) + unestorePriorities := Priorities{ + HighPriorities: highProprites, + LowPriorities: defaultRestorePriorities.LowPriorities, + } - // Restore everything else selectedResourceCollection, _, err := kr.getOrderedResourceCollection( backupResources, - crdResourceCollection, - processedResources, - kr.resourcePriorities, + make([]restoreableResource, 0), + sets.KeySet(make(map[string]string)), + unestorePriorities, true, ) if err != nil { return errors.Wrap(err, "getOrderedResourceCollection err") } - klog.Infof("Restore everything size: %v", len(selectedResourceCollection)) - for _, selectedResource := range selectedResourceCollection { // Restore this resource - processedItems, err = kr.processSelectedResource( - selectedResource, - totalItems, - processedItems, - existingNamespaces, - ) + err = kr.deleteSelectedResource(selectedResource, allRestored) if err != nil { - return errors.Wrap(err, "processSelectedResource err") + return errors.Wrap(err, "deleteSelectedResource err") } } @@ -320,7 +327,6 @@ func (kr *kubernetesRestorer) getOrderedResourceCollection( // in the expected total restore count. func (kr *kubernetesRestorer) processSelectedResource( selectedResource restoreableResource, - totalItems int, processedItems int, existingNamespaces sets.Set[string], ) (int, error) { @@ -352,12 +358,12 @@ func (kr *kubernetesRestorer) processSelectedResource( // Add the newly created namespace to the list of restored items. if nsCreated { - itemKey := requests.ItemKey{ - Resource: resourceKey(ns), + itemKey := types.ItemKey{ + Resource: groupResource.String(), Namespace: ns.Namespace, Name: ns.Name, } - kr.restoredItems[itemKey] = restoredItemStatus{action: constants.ItemRestoreResultCreated, itemExists: true} + kr.request.RestoredItems[itemKey] = types.RestoredItemStatus{Action: constants.ItemRestoreResultCreated, ItemExists: true} } // Keep track of namespaces that we know exist so we don't @@ -388,6 +394,41 @@ func (kr *kubernetesRestorer) processSelectedResource( return processedItems, nil } +func (kr *kubernetesRestorer) deleteSelectedResource(selectedResource restoreableResource, allRestored bool) error { + groupResource := schema.ParseGroupResource(selectedResource.resource) + + for _, selectedItems := range selectedResource.selectedItemsByNamespace { + for _, selectedItem := range selectedItems { + obj, err := archive.Unmarshal(kr.fileSystem, selectedItem.path) + if err != nil { + if err != nil { + return errors.Errorf("error decoding %q: %v", strings.Replace(selectedItem.path, kr.restoreDir+"/", "", -1), err) + } + } + + if !allRestored { + item := types.ItemKey{ + Resource: groupResource.String(), + Name: selectedItem.name, + Namespace: selectedItem.targetNamespace, + } + + if _, ok := kr.request.RestoredItems[item]; !ok { + // unrestored resource, doesn't need to handle + continue + } + } + + _, err = kr.deleteItem(obj, groupResource, selectedItem.targetNamespace) + if err != nil { + return errors.Wrap(err, "deleteItem error") + } + } + } + + return nil +} + // getSelectedRestoreableItems applies Kubernetes selectors on individual items // of each resource type to create a list of items which will be actually // restored. @@ -437,16 +478,6 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR itemExists := false resourceID := getResourceID(groupResource, namespace, obj.GetName()) - //restoreLogger := ctx.log.WithFields(logrus.Fields{ - // "namespace": obj.GetNamespace(), - // "name": obj.GetName(), - // "groupResource": groupResource.String(), - //}) - - // Check if group/resource should be restored. We need to do this here since - // this method may be getting called for an additional item which is a group/resource - // that's excluded. - if namespace != "" { nsToEnsure := getNamespace(archive.GetItemFilePath(kr.restoreDir, "namespaces", "", obj.GetNamespace()), namespace) _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, kr.namespaceClient, kr.resourceTerminatingTimeout) @@ -455,12 +486,12 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR } // Add the newly created namespace to the list of restored items. if nsCreated { - itemKey := requests.ItemKey{ - Resource: resourceKey(nsToEnsure), + itemKey := types.ItemKey{ + Resource: groupResource.String(), Namespace: nsToEnsure.Namespace, Name: nsToEnsure.Name, } - kr.restoredItems[itemKey] = restoredItemStatus{action: constants.ItemRestoreResultCreated, itemExists: true} + kr.request.RestoredItems[itemKey] = types.RestoredItemStatus{Action: constants.ItemRestoreResultCreated, ItemExists: true} } } @@ -476,33 +507,27 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR name := obj.GetName() // Check if we've already restored this itemKey. - itemKey := requests.ItemKey{ - Resource: resourceKey(obj), + itemKey := types.ItemKey{ + Resource: groupResource.String(), Namespace: namespace, Name: name, } - if prevRestoredItemStatus, exists := kr.restoredItems[itemKey]; exists { + if prevRestoredItemStatus, exists := kr.request.RestoredItems[itemKey]; exists { klog.Infof("Skipping %s because it's already been restored.", resourceID) - itemExists = prevRestoredItemStatus.itemExists + itemExists = prevRestoredItemStatus.ItemExists return itemExists, nil } - kr.restoredItems[itemKey] = restoredItemStatus{itemExists: itemExists} + kr.request.RestoredItems[itemKey] = types.RestoredItemStatus{ItemExists: itemExists} defer func() { - itemStatus := kr.restoredItems[itemKey] + itemStatus := kr.request.RestoredItems[itemKey] // the action field is set explicitly - if len(itemStatus.action) > 0 { + if len(itemStatus.Action) > 0 { return } - // no action specified, and no warnings and errors - //if errs.IsEmpty() && warnings.IsEmpty() { - // itemStatus.action = itemRestoreResultSkipped - // kr.restoredItems[itemKey] = itemStatus - // return - //} // others are all failed - itemStatus.action = constants.ItemRestoreResultFailed - kr.restoredItems[itemKey] = itemStatus + itemStatus.Action = constants.ItemRestoreResultFailed + kr.request.RestoredItems[itemKey] = itemStatus }() if action, ok := kr.actions[groupResource.String()]; ok { @@ -514,17 +539,10 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR //objStatus, statusFieldExists, statusFieldErr := unstructured.NestedFieldCopy(obj.Object, "status") // Clear out non-core metadata fields and status. - if obj, err = resetMetadataAndStatus(obj); err != nil { + if obj, err = kube.ResetMetadataAndStatus(obj); err != nil { return itemExists, err } - // Necessary because we may have remapped the namespace if the namespace is - // blank, don't create the key. - //originalNamespace := obj.GetNamespace() - //if namespace != "" { - // obj.SetNamespace(namespace) - //} - // The object apiVersion might get modified by a RestorePlugin so we need to // get a new client to reflect updated resource path. newGR := schema.GroupResource{Group: obj.GroupVersionKind().Group, Resource: groupResource.Resource} @@ -542,7 +560,7 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR _, restoreErr = resourceClient.Create(obj) if restoreErr == nil { itemExists = true - kr.restoredItems[itemKey] = restoredItemStatus{action: constants.ItemRestoreResultCreated, itemExists: itemExists} + kr.request.RestoredItems[itemKey] = types.RestoredItemStatus{Action: constants.ItemRestoreResultCreated, ItemExists: itemExists} } // Error was something other than an AlreadyExists. @@ -557,6 +575,70 @@ func (kr *kubernetesRestorer) restoreItem(obj *unstructured.Unstructured, groupR return itemExists, nil } +func (kr *kubernetesRestorer) deleteItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (bool, error) { + // Check if we've already restored this itemKey. + itemKey := types.ItemKey{ + Resource: groupResource.String(), + Namespace: namespace, + Name: obj.GetName(), + } + + // The object apiVersion might get modified by a RestorePlugin so we need to + // get a new client to reflect updated resource path. + resourceClient, err := kr.getResourceClient(groupResource, obj, obj.GetNamespace()) + if err != nil { + return false, errors.Errorf("error getting updated resource client for namespace %q, resource %q: %v", namespace, &groupResource, err) + } + + if action, ok := kr.actions[groupResource.String()]; ok { + klog.Infof("Attempting to revert %s: %v", obj.GroupVersionKind().Kind, obj.GetName()) + fromCluster, err := resourceClient.Get(obj.GetName(), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("resource %s not found. skip unstore", obj.GroupVersionKind().String(), obj.GetName()) + return true, nil + } else { + return false, errors.Wrapf(err, "get resource %s %s failed.", obj.GroupVersionKind().String(), obj.GetName()) + } + } + + updatedObj, err := action.Revert(fromCluster, kr) + if err != nil { + return false, errors.Errorf("error revert %s action: %v", groupResource.String(), err) + } + + patchBytes, err := kube.GeneratePatch(fromCluster, updatedObj) + if err != nil { + return false, errors.Wrap(err, "error generating patch") + } + if patchBytes == nil { + klog.Infof("the same obj %s. skipped patch", obj.GetName()) + } else { + _, err = resourceClient.Patch(obj.GetName(), patchBytes) + if err != nil { + return false, errors.Wrapf(err, "patch %s error", obj.GetName()) + } + } + } + + klog.Infof("Deleting %s: %v", obj.GroupVersionKind().Kind, obj.GetName()) + deleteOptions := metav1.DeleteOptions{} + if groupResource == kuberesource.Pods { + graceDeleteSecond := int64(0) + deleteOptions = metav1.DeleteOptions{GracePeriodSeconds: &graceDeleteSecond} + } + err = resourceClient.Delete(obj.GetName(), deleteOptions) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("delete %s %s error because resource not found.", obj.GroupVersionKind().String(), obj.GetName()) + } else { + klog.Errorf("error delete delete %s %s. %s", obj.GroupVersionKind().String(), obj.GetName(), err.Error()) + } + } + delete(kr.request.RestoredItems, itemKey) + return true, nil +} + func (kr *kubernetesRestorer) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) { key := getResourceClientKey(groupResource, obj.GroupVersionKind().Version, namespace) @@ -623,40 +705,6 @@ func getResourceID(groupResource schema.GroupResource, namespace, name string) s return fmt.Sprintf("%s/%s/%s", groupResource.String(), namespace, name) } -func resetStatus(obj *unstructured.Unstructured) { - unstructured.RemoveNestedField(obj.UnstructuredContent(), "status") -} - -func resetMetadataAndStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - _, err := resetMetadata(obj) - if err != nil { - return nil, err - } - resetStatus(obj) - return obj, nil -} - -func resetMetadata(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - res, ok := obj.Object["metadata"] - if !ok { - return nil, errors.New("metadata not found") - } - metadata, ok := res.(map[string]interface{}) - if !ok { - return nil, errors.Errorf("metadata was of type %T, expected map[string]interface{}", res) - } - - for k := range metadata { - switch k { - case "generateName", "selfLink", "uid", "resourceVersion", "generation", "creationTimestamp", "deletionTimestamp", - "deletionGracePeriodSeconds", "ownerReferences": - delete(metadata, k) - } - } - - return obj, nil -} - // getNamespace returns a namespace API object that we should attempt to // create before restoring anything into it. It will come from the backup // tarball if it exists, else will be a new one. If from the tarball, it @@ -734,12 +782,9 @@ func getOrderedResources(resourcePriorities Priorities, backupResources map[stri return append(list, resourcePriorities.LowPriorities...) } -func resourceKey(obj runtime.Object) string { - gvk := obj.GetObjectKind().GroupVersionKind() - return fmt.Sprintf("%s/%s", gvk.GroupVersion().String(), gvk.Kind) -} - -type restoredItemStatus struct { - action string - itemExists bool +// ReversSlice reverse the slice +func reversSlice(s []string) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } } diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/service_restore_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/service_restore_action.go new file mode 100644 index 000000000..c00dc6329 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/service_restore_action.go @@ -0,0 +1,68 @@ +package restore + +import ( + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +type ServiceAction struct { +} + +func NewServiceAction() *ServiceAction { + return &ServiceAction{} +} + +func (p *ServiceAction) Resource() []string { + return []string{"services"} +} + +func (p *ServiceAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedService := new(corev1.Service) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, updatedService); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to service") + } + + if updatedService.Spec.ClusterIP != "None" { + updatedService.Spec.ClusterIP = "" + updatedService.Spec.ClusterIPs = nil + } + + annotations := updatedService.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + if _, ok := annotations["kosmos.io/auto-create-mcs"]; !ok { + annotations["kosmos.io/auto-create-mcs"] = "true" + } + updatedService.SetAnnotations(annotations) + + serviceMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedService) + if err != nil { + return nil, errors.Wrap(err, "unable to convert pod to unstructured item") + } + return &unstructured.Unstructured{Object: serviceMap}, nil +} + +func (p *ServiceAction) Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedService := new(corev1.Service) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(fromCluster.Object, updatedService); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to service") + } + + annotations := updatedService.GetAnnotations() + if annotations != nil { + if _, ok := annotations["kosmos.io/auto-create-mcs"]; ok { + delete(annotations, "kosmos.io/auto-create-mcs") + updatedService.SetAnnotations(annotations) + serviceMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedService) + if err != nil { + return nil, errors.Wrap(err, "unable to convert service to unstructured item") + } + return &unstructured.Unstructured{Object: serviceMap}, nil + } + } + + return fromCluster, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/sts_deploy_restore_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/sts_deploy_restore_action.go new file mode 100644 index 000000000..3068c0724 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/sts_deploy_restore_action.go @@ -0,0 +1,128 @@ +package restore + +import ( + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +type StsDeployAction struct { +} + +func NewStsDeployAction() *StsDeployAction { + return &StsDeployAction{} +} + +func (p *StsDeployAction) Resource() []string { + return []string{"statefulsets.apps", "deployments.apps"} +} + +//nolint:gosec // No need to check. +func (p *StsDeployAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + _ = &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpIn, + Values: []string{restorer.kosmosNodeName}, + }, + }, + }, + }, + }, + } + + kosmosToleration := corev1.Toleration{ + Key: "kosmos.io/node", + Operator: corev1.TolerationOpEqual, + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + } + + var updatedObj interface{} + + if obj.GetKind() == "Deployment" { + updatedDeploy := new(appsv1.Deployment) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, updatedDeploy); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to deployment") + } + + //affinity := updatedDeploy.Spec.Template.Spec.Affinity + //if affinity == nil { + // affinity = &corev1.Affinity{ + // NodeAffinity: updatedNodeAffinity, + // } + //} else { + // updatedDeploy.Spec.Template.Spec.Affinity.NodeAffinity = updatedNodeAffinity + //} + + tolerations := updatedDeploy.Spec.Template.Spec.Tolerations + if tolerations == nil { + tolerations = make([]corev1.Toleration, 1) + tolerations[0] = kosmosToleration + } else { + kosmosExist := false + for _, toleration := range tolerations { + if toleration.Key == kosmosToleration.Key { + kosmosExist = true + break + } + } + + if !kosmosExist { + updatedDeploy.Spec.Template.Spec.Tolerations = append(tolerations, kosmosToleration) + } + } + updatedObj = updatedDeploy + } else if obj.GetKind() == "StatefulSet" { + updatedSts := new(appsv1.StatefulSet) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, updatedSts); err != nil { + return nil, errors.Wrap(err, "unable to convert unstructured item to statefulset") + } + + //affinity := updatedSts.Spec.Template.Spec.Affinity + //if affinity == nil { + // affinity = &corev1.Affinity{ + // NodeAffinity: updatedNodeAffinity, + // } + //} else { + // updatedSts.Spec.Template.Spec.Affinity.NodeAffinity = updatedNodeAffinity + //} + + tolerations := updatedSts.Spec.Template.Spec.Tolerations + if tolerations == nil { + tolerations = make([]corev1.Toleration, 1) + tolerations[0] = kosmosToleration + } else { + kosmosExist := false + for _, toleration := range tolerations { + if toleration.Key == kosmosToleration.Key { + kosmosExist = true + break + } + } + + if !kosmosExist { + updatedSts.Spec.Template.Spec.Tolerations = append(tolerations, kosmosToleration) + } + } + updatedObj = updatedSts + } else { + return nil, errors.Errorf("unknow obj kind %s", obj.GetKind()) + } + + stsMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&updatedObj) + if err != nil { + return nil, errors.Wrap(err, "unable to convert sts/deploy to unstructured item") + } + return &unstructured.Unstructured{Object: stsMap}, nil +} + +func (p *StsDeployAction) Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + return fromCluster, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/restore/universal_restore_action.go b/pkg/clustertree/cluster-manager/controllers/promote/restore/universal_restore_action.go new file mode 100644 index 000000000..54388c2bf --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/restore/universal_restore_action.go @@ -0,0 +1,56 @@ +package restore + +import ( + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type UniversalAction struct { +} + +func NewUniversalAction() *UniversalAction { + return &UniversalAction{} +} + +func (p *UniversalAction) Resource() []string { + return []string{"persistentvolumeclaims", "configmaps", "secrets", "serviceaccounts", "roles.rbac.authorization.k8s.io", "rolebindings.rbac.authorization.k8s.io"} +} + +func (p *UniversalAction) Execute(obj *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedObj := obj.DeepCopy() + objectMeta, err := meta.Accessor(updatedObj) + if err != nil { + return nil, errors.WithStack(err) + } + + annotations := objectMeta.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + if _, ok := annotations["kosmos-io/cluster-owners"]; !ok { + annotations["kosmos-io/cluster-owners"] = restorer.kosmosClusterName + updatedObj.SetAnnotations(annotations) + } + + return updatedObj, nil +} + +func (p *UniversalAction) Revert(fromCluster *unstructured.Unstructured, restorer *kubernetesRestorer) (*unstructured.Unstructured, error) { + updatedObj := fromCluster.DeepCopy() + objectMeta, err := meta.Accessor(updatedObj) + if err != nil { + return nil, errors.WithStack(err) + } + + annotations := objectMeta.GetAnnotations() + if annotations != nil { + if _, ok := annotations["kosmos-io/cluster-owners"]; ok { + delete(annotations, "kosmos-io/cluster-owners") + updatedObj.SetAnnotations(annotations) + } + } + + return updatedObj, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/types/types.go b/pkg/clustertree/cluster-manager/controllers/promote/types/types.go new file mode 100644 index 000000000..a1d7e3d64 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/types/types.go @@ -0,0 +1,15 @@ +package types + +// HandlerInitializer is a function that initializes and returns a new instance of one of action interfaces +type HandlerInitializer func() (interface{}, error) + +type ItemKey struct { + Resource string + Namespace string + Name string +} + +type RestoredItemStatus struct { + Action string + ItemExists bool +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/utils/kube/kube_utils.go b/pkg/clustertree/cluster-manager/controllers/promote/utils/kube/kube_utils.go index ce1b7c16a..e9bb3bc0a 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/utils/kube/kube_utils.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/utils/kube/kube_utils.go @@ -21,14 +21,17 @@ import ( "fmt" "time" + jsonpatch "github.com/evanphx/json-patch" "github.com/pkg/errors" corev1api "k8s.io/api/core/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -101,6 +104,40 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core return true, nsCreated, nil } +func resetStatus(obj *unstructured.Unstructured) { + unstructured.RemoveNestedField(obj.UnstructuredContent(), "status") +} + +func ResetMetadataAndStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + _, err := resetMetadata(obj) + if err != nil { + return nil, err + } + resetStatus(obj) + return obj, nil +} + +func resetMetadata(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + res, ok := obj.Object["metadata"] + if !ok { + return nil, errors.New("metadata not found") + } + metadata, ok := res.(map[string]interface{}) + if !ok { + return nil, errors.Errorf("metadata was of type %T, expected map[string]interface{}", res) + } + + for k := range metadata { + switch k { + case "generateName", "selfLink", "uid", "resourceVersion", "generation", "creationTimestamp", "deletionTimestamp", + "deletionGracePeriodSeconds", "ownerReferences": + delete(metadata, k) + } + } + + return obj, nil +} + // IsV1CRDReady checks a v1 CRD to see if it's ready, with both the Established and NamesAccepted conditions. func IsV1CRDReady(crd *apiextv1.CustomResourceDefinition) bool { var isEstablished, namesAccepted bool @@ -153,3 +190,27 @@ func IsCRDReady(crd *unstructured.Unstructured) (bool, error) { return false, fmt.Errorf("unable to handle CRD with version %s", ver) } } + +func GeneratePatch(fromCluster, desired *unstructured.Unstructured) ([]byte, error) { + // If the objects are already equal, there's no need to generate a patch. + if equality.Semantic.DeepEqual(fromCluster, desired) { + return nil, nil + } + + desiredBytes, err := json.Marshal(desired.Object) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal desired object") + } + + fromClusterBytes, err := json.Marshal(fromCluster.Object) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal in-cluster object") + } + + patchBytes, err := jsonpatch.CreateMergePatch(fromClusterBytes, desiredBytes) + if err != nil { + return nil, errors.Wrap(err, "unable to create merge patch") + } + + return patchBytes, nil +} diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index e4d28250f..1466984d8 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -2059,6 +2059,13 @@ func schema_pkg_apis_kosmos_v1alpha1_PromotePolicySpec(ref common.ReferenceCallb }, }, }, + "rollback": { + SchemaProps: spec.SchemaProps{ + Description: "Rollback set true, then rollback from the backup", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, @@ -2119,6 +2126,12 @@ func schema_pkg_apis_kosmos_v1alpha1_PromotePolicyStatus(ref common.ReferenceCal Ref: ref("github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.PromotePolicyProgress"), }, }, + "backedupFile": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, },