diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 5a77d7f4a..1ad98cda5 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -226,23 +226,23 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("error starting rootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err) } - if !opts.OnewayStorageControllers { - rootPVCController := pvc.RootPVCController{ - RootClient: mgr.GetClient(), - GlobalLeafManager: globalleafManager, - } - if err := rootPVCController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error starting root pvc controller %v", err) - } + rootPVCController := pvc.RootPVCController{ + RootClient: mgr.GetClient(), + GlobalLeafManager: globalleafManager, + } + if err := rootPVCController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting root pvc controller %v", err) + } - rootPVController := pv.RootPVController{ - RootClient: mgr.GetClient(), - GlobalLeafManager: globalleafManager, - } - if err := rootPVController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error starting root pv controller %v", err) - } - } else { + rootPVController := pv.RootPVController{ + RootClient: mgr.GetClient(), + GlobalLeafManager: globalleafManager, + } + if err := rootPVController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting root pv controller %v", err) + } + + if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 { onewayPVController := pv.OnewayPVController{ Root: mgr.GetClient(), RootDynamic: dynamicClient, diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index 5dcd48a43..f294e817f 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -303,11 +303,9 @@ func (c *ClusterController) setupControllers( return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err) } - if !c.Options.OnewayStorageControllers { - err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name) - if err != nil { - return err - } + err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name) + if err != nil { + return err } return nil diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index 85667161a..b193aec6a 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -739,14 +739,18 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea // pvc go func() { if err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { - if !r.Options.OnewayStorageControllers { - klog.V(4).Info("Trying to creating dependent pvc") - if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcs, basicPod, clusterNodeInfo); err != nil { - klog.Error(err) - return false, nil - } - klog.V(4).Infof("Create pvc %v of %v/%v success", pvcs, basicPod.Namespace, basicPod.Name) + pvcsWithoutEs, err := podutils.NoESPVCFilter(ctx, r.DynamicRootClient, pvcs, basicPod.Namespace) + if err != nil { + klog.Error(err) + return false, err + } + klog.V(4).Info("Trying to creating dependent pvc") + if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil { + klog.Error(err) + return false, nil } + klog.V(4).Infof("Create pvc %v of %v/%v success", pvcsWithoutEs, basicPod.Namespace, basicPod.Name) + // } return true, nil }); err != nil { ch <- fmt.Sprintf("create pvc failed: %v", err) diff --git a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go index f41ab3406..4c47da2a2 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -177,13 +178,16 @@ func (l *LeafPVController) SetupWithManager(mgr manager.Manager) error { WithOptions(controller.Options{}). For(&v1.PersistentVolume{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { - return true + curr := createEvent.Object.(*v1.PersistentVolume) + return !podutils.IsESPV(curr) }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - return true + curr := updateEvent.ObjectNew.(*v1.PersistentVolume) + return !podutils.IsESPV(curr) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - return true + curr := deleteEvent.Object.(*v1.PersistentVolume) + return !podutils.IsESPV(curr) }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false diff --git a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go index 65e6ff62d..9d79612c0 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go @@ -8,7 +8,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/klog" @@ -23,21 +22,15 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( controllerName = "oneway-pv-controller" requeueTime = 10 * time.Second quickRequeueTime = 3 * time.Second - csiDriverName = "infini.volumepath.csi" ) -var VolumePathGVR = schema.GroupVersionResource{ - Version: "v1alpha1", - Group: "lvm.infinilabs.com", - Resource: "volumepaths", -} - type OnewayPVController struct { Root client.Client RootDynamic dynamic.Interface @@ -48,15 +41,15 @@ func (c *OnewayPVController) SetupWithManager(mgr manager.Manager) error { predicatesFunc := predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { curr := createEvent.Object.(*corev1.PersistentVolume) - return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName + return podutils.IsESPV(curr) }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { curr := updateEvent.ObjectNew.(*corev1.PersistentVolume) - return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName + return podutils.IsESPV(curr) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { curr := deleteEvent.Object.(*corev1.PersistentVolume) - return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName + return podutils.IsESPV(curr) }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false @@ -84,7 +77,7 @@ func (c *OnewayPVController) Reconcile(ctx context.Context, request reconcile.Re } // volumePath has the same name with pv - vp, err := c.RootDynamic.Resource(VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{}) + vp, err := c.RootDynamic.Resource(podutils.VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("vp %s not found", request.Name) diff --git a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go index 0936497e7..ed2b9e57b 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go @@ -18,6 +18,7 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -52,6 +53,11 @@ func (r *RootPVController) SetupWithManager(mgr manager.Manager) error { } pv := deleteEvent.Object.(*v1.PersistentVolume) + // skip es pv, oneway_pv_controller will handle this PV + if podutils.IsESPV(pv) { + return false + } + clusters := utils.ListResourceClusters(pv.Annotations) if len(clusters) == 0 { klog.Warningf("pv leaf %q doesn't existed", deleteEvent.Object.GetName()) diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go index 821ee7687..1839824f5 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go @@ -126,7 +126,7 @@ func (l *LeafPVCController) SetupWithManager(mgr manager.Manager) error { }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { pvc := updateEvent.ObjectOld.(*v1.PersistentVolumeClaim) - return utils.IsObjectGlobal(&pvc.ObjectMeta) + return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsESPVC(pvc) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go index c6f005a46..fe8fa040b 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go @@ -8,7 +8,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/klog" @@ -23,50 +22,33 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( - controllerName = "oneway-pvc-controller" - requeueTime = 10 * time.Second - vpAnnotationKey = "volumepath" + controllerName = "oneway-pvc-controller" + requeueTime = 10 * time.Second ) -var VolumePathGVR = schema.GroupVersionResource{ - Version: "v1alpha1", - Group: "lvm.infinilabs.com", - Resource: "volumepaths", -} - type OnewayPVCController struct { Root client.Client RootDynamic dynamic.Interface GlobalLeafManager leafUtils.LeafResourceManager } -func pvcEventFilter(pvc *corev1.PersistentVolumeClaim) bool { - anno := pvc.GetAnnotations() - if anno == nil { - return false - } - if _, ok := anno[vpAnnotationKey]; ok { - return true - } - return false -} - func (c *OnewayPVCController) SetupWithManager(mgr manager.Manager) error { predicatesFunc := predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { curr := createEvent.Object.(*corev1.PersistentVolumeClaim) - return pvcEventFilter(curr) + return podutils.IsESPVC(curr) }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { curr := updateEvent.ObjectNew.(*corev1.PersistentVolumeClaim) - return pvcEventFilter(curr) + return podutils.IsESPVC(curr) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { curr := deleteEvent.Object.(*corev1.PersistentVolumeClaim) - return pvcEventFilter(curr) + return podutils.IsESPVC(curr) }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false @@ -93,7 +75,7 @@ func (c *OnewayPVCController) Reconcile(ctx context.Context, request reconcile.R } // volumePath has the same name with pvc - vp, err := c.RootDynamic.Resource(VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{}) + vp, err := c.RootDynamic.Resource(podutils.VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("vp %s not found", request.Name) diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go index eba645ceb..f7dd36615 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go @@ -21,6 +21,7 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -106,7 +107,9 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error { return false }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - return true + // skip es pvc, oneway_pv_controller will handle this PVC + curr := updateEvent.ObjectNew.(*v1.PersistentVolumeClaim) + return !podutils.IsESPVC(curr) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { if deleteEvent.DeleteStateUnknown { @@ -116,6 +119,11 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error { } pvc := deleteEvent.Object.(*v1.PersistentVolumeClaim) + // skip es pvc, oneway_pv_controller will handle this PVC + if podutils.IsESPVC(pvc) { + return false + } + clusters := utils.ListResourceClusters(pvc.Annotations) if len(clusters) == 0 { klog.V(4).Infof("pvc leaf %q: %q doesn't existed", deleteEvent.Object.GetNamespace(), deleteEvent.Object.GetName()) diff --git a/pkg/utils/podutils/es.go b/pkg/utils/podutils/es.go new file mode 100644 index 000000000..80b7b6d6d --- /dev/null +++ b/pkg/utils/podutils/es.go @@ -0,0 +1,78 @@ +package podutils + +import ( + "context" + "os" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + "github.com/kosmos.io/kosmos/pkg/utils" +) + +var VolumePathGVR = schema.GroupVersionResource{ + Version: os.Getenv("ONEWAY-VERSION"), + Group: os.Getenv("ONEWAY-GROUP"), + Resource: os.Getenv("ONEWAY-RESOURCE"), +} + +var ( + csiDriverName = os.Getenv("ONEWAY-CSI-DRIVER-NAME") + vpAnnotationKey = os.Getenv("ONEWAY-CSI-ANNOTATION-KEY") +) + +func NoESPVCFilter(ctx context.Context, dynamicRootClient dynamic.Interface, pvcNames []string, ns string) ([]string, error) { + if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 { + return pvcNames, nil + } + result := []string{} + for _, pvcName := range pvcNames { + f, err := IsESPVCByName(ctx, dynamicRootClient, pvcName, ns) + if err != nil { + return nil, err + } + if !f { + result = append(result, pvcName) + } + } + return result, nil +} + +func IsESPVCByName(ctx context.Context, dynamicRootClient dynamic.Interface, pvcName string, ns string) (bool, error) { + rootobj, err := dynamicRootClient.Resource(utils.GVR_PVC).Namespace(ns).Get(ctx, pvcName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + pvcObj := &corev1.PersistentVolumeClaim{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(rootobj.Object, pvcObj) + if err != nil { + return false, err + } + + return IsESPVC(pvcObj), nil +} + +func IsESPVC(pvc *corev1.PersistentVolumeClaim) bool { + if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 { + return false + } + anno := pvc.GetAnnotations() + if anno == nil { + return false + } + if _, ok := anno[vpAnnotationKey]; ok { + return true + } + return false +} + +func IsESPV(pv *corev1.PersistentVolume) bool { + if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 { + return false + } + return pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csiDriverName +}