Skip to content

Commit

Permalink
Merge pull request kosmos-io#384 from OrangeBao/dev_sc
Browse files Browse the repository at this point in the history
feat: supports the coexistence of multiple storage provisioning methods
  • Loading branch information
kosmos-robot authored Jan 24, 2024
2 parents 58f4c76 + dbc5219 commit 76f614b
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 70 deletions.
32 changes: 16 additions & 16 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,23 +226,23 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("error starting rootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)
}

if !opts.OnewayStorageControllers {
rootPVCController := pvc.RootPVCController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
}
if err := rootPVCController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pvc controller %v", err)
}
rootPVCController := pvc.RootPVCController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
}
if err := rootPVCController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pvc controller %v", err)
}

rootPVController := pv.RootPVController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
}
if err := rootPVController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pv controller %v", err)
}
} else {
rootPVController := pv.RootPVController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
}
if err := rootPVController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pv controller %v", err)
}

if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 {
onewayPVController := pv.OnewayPVController{
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
Expand Down
8 changes: 3 additions & 5 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,9 @@ func (c *ClusterController) setupControllers(
return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err)
}

if !c.Options.OnewayStorageControllers {
err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name)
if err != nil {
return err
}
err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name)
if err != nil {
return err
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,14 +739,18 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
// pvc
go func() {
if err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
if !r.Options.OnewayStorageControllers {
klog.V(4).Info("Trying to creating dependent pvc")
if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcs, basicPod, clusterNodeInfo); err != nil {
klog.Error(err)
return false, nil
}
klog.V(4).Infof("Create pvc %v of %v/%v success", pvcs, basicPod.Namespace, basicPod.Name)
pvcsWithoutEs, err := podutils.NoOneWayPVCFilter(ctx, r.DynamicRootClient, pvcs, basicPod.Namespace)
if err != nil {
klog.Error(err)
return false, err
}
klog.V(4).Info("Trying to creating dependent pvc")
if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil {
klog.Error(err)
return false, nil
}
klog.V(4).Infof("Create pvc %v of %v/%v success", pvcsWithoutEs, basicPod.Namespace, basicPod.Name)
// }
return true, nil
}); err != nil {
ch <- fmt.Sprintf("create pvc failed: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
Expand Down Expand Up @@ -177,13 +178,16 @@ func (l *LeafPVController) SetupWithManager(mgr manager.Manager) error {
WithOptions(controller.Options{}).
For(&v1.PersistentVolume{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return true
curr := createEvent.Object.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return true
curr := updateEvent.ObjectNew.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
curr := deleteEvent.Object.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/klog"
Expand All @@ -23,21 +22,15 @@ import (

leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
controllerName = "oneway-pv-controller"
requeueTime = 10 * time.Second
quickRequeueTime = 3 * time.Second
csiDriverName = "infini.volumepath.csi"
)

var VolumePathGVR = schema.GroupVersionResource{
Version: "v1alpha1",
Group: "lvm.infinilabs.com",
Resource: "volumepaths",
}

type OnewayPVController struct {
Root client.Client
RootDynamic dynamic.Interface
Expand All @@ -48,15 +41,15 @@ func (c *OnewayPVController) SetupWithManager(mgr manager.Manager) error {
predicatesFunc := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
curr := createEvent.Object.(*corev1.PersistentVolume)
return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName
return podutils.IsOneWayPV(curr)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
curr := updateEvent.ObjectNew.(*corev1.PersistentVolume)
return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName
return podutils.IsOneWayPV(curr)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
curr := deleteEvent.Object.(*corev1.PersistentVolume)
return curr.Spec.CSI != nil && curr.Spec.CSI.Driver == csiDriverName
return podutils.IsOneWayPV(curr)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
Expand Down Expand Up @@ -84,7 +77,7 @@ func (c *OnewayPVController) Reconcile(ctx context.Context, request reconcile.Re
}

// volumePath has the same name with pv
vp, err := c.RootDynamic.Resource(VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{})
vp, err := c.RootDynamic.Resource(podutils.VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("vp %s not found", request.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
Expand Down Expand Up @@ -52,6 +53,11 @@ func (r *RootPVController) SetupWithManager(mgr manager.Manager) error {
}

pv := deleteEvent.Object.(*v1.PersistentVolume)
// skip one way pv, oneway_pv_controller will handle this PV
if podutils.IsOneWayPV(pv) {
return false
}

clusters := utils.ListResourceClusters(pv.Annotations)
if len(clusters) == 0 {
klog.Warningf("pv leaf %q doesn't existed", deleteEvent.Object.GetName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (l *LeafPVCController) SetupWithManager(mgr manager.Manager) error {
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
pvc := updateEvent.ObjectOld.(*v1.PersistentVolumeClaim)
return utils.IsObjectGlobal(&pvc.ObjectMeta)
return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsOneWayPVC(pvc)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/klog"
Expand All @@ -23,50 +22,33 @@ import (

leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
controllerName = "oneway-pvc-controller"
requeueTime = 10 * time.Second
vpAnnotationKey = "volumepath"
controllerName = "oneway-pvc-controller"
requeueTime = 10 * time.Second
)

var VolumePathGVR = schema.GroupVersionResource{
Version: "v1alpha1",
Group: "lvm.infinilabs.com",
Resource: "volumepaths",
}

type OnewayPVCController struct {
Root client.Client
RootDynamic dynamic.Interface
GlobalLeafManager leafUtils.LeafResourceManager
}

func pvcEventFilter(pvc *corev1.PersistentVolumeClaim) bool {
anno := pvc.GetAnnotations()
if anno == nil {
return false
}
if _, ok := anno[vpAnnotationKey]; ok {
return true
}
return false
}

func (c *OnewayPVCController) SetupWithManager(mgr manager.Manager) error {
predicatesFunc := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
curr := createEvent.Object.(*corev1.PersistentVolumeClaim)
return pvcEventFilter(curr)
return podutils.IsOneWayPVC(curr)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
curr := updateEvent.ObjectNew.(*corev1.PersistentVolumeClaim)
return pvcEventFilter(curr)
return podutils.IsOneWayPVC(curr)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
curr := deleteEvent.Object.(*corev1.PersistentVolumeClaim)
return pvcEventFilter(curr)
return podutils.IsOneWayPVC(curr)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
Expand All @@ -93,7 +75,7 @@ func (c *OnewayPVCController) Reconcile(ctx context.Context, request reconcile.R
}

// volumePath has the same name with pvc
vp, err := c.RootDynamic.Resource(VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{})
vp, err := c.RootDynamic.Resource(podutils.VolumePathGVR).Get(ctx, request.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("vp %s not found", request.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
Expand Down Expand Up @@ -106,7 +107,9 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error {
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return true
// skip one way pvc, oneway_pv_controller will handle this PVC
curr := updateEvent.ObjectNew.(*v1.PersistentVolumeClaim)
return !podutils.IsOneWayPVC(curr)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
if deleteEvent.DeleteStateUnknown {
Expand All @@ -116,6 +119,11 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error {
}

pvc := deleteEvent.Object.(*v1.PersistentVolumeClaim)
// skip one way pvc, oneway_pv_controller will handle this PVC
if podutils.IsOneWayPVC(pvc) {
return false
}

clusters := utils.ListResourceClusters(pvc.Annotations)
if len(clusters) == 0 {
klog.V(4).Infof("pvc leaf %q: %q doesn't existed", deleteEvent.Object.GetNamespace(), deleteEvent.Object.GetName())
Expand Down
78 changes: 78 additions & 0 deletions pkg/utils/podutils/es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package podutils

import (
"context"
"os"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

"github.com/kosmos.io/kosmos/pkg/utils"
)

var VolumePathGVR = schema.GroupVersionResource{
Version: os.Getenv("ONEWAY-VERSION"),
Group: os.Getenv("ONEWAY-GROUP"),
Resource: os.Getenv("ONEWAY-RESOURCE"),
}

var (
csiDriverName = os.Getenv("ONEWAY-CSI-DRIVER-NAME")
vpAnnotationKey = os.Getenv("ONEWAY-CSI-ANNOTATION-KEY")
)

func NoOneWayPVCFilter(ctx context.Context, dynamicRootClient dynamic.Interface, pvcNames []string, ns string) ([]string, error) {
if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 {
return pvcNames, nil
}
result := []string{}
for _, pvcName := range pvcNames {
f, err := IsOneWayPVCByName(ctx, dynamicRootClient, pvcName, ns)
if err != nil {
return nil, err
}
if !f {
result = append(result, pvcName)
}
}
return result, nil
}

func IsOneWayPVCByName(ctx context.Context, dynamicRootClient dynamic.Interface, pvcName string, ns string) (bool, error) {
rootobj, err := dynamicRootClient.Resource(utils.GVR_PVC).Namespace(ns).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
return false, err
}

pvcObj := &corev1.PersistentVolumeClaim{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(rootobj.Object, pvcObj)
if err != nil {
return false, err
}

return IsOneWayPVC(pvcObj), nil
}

func IsOneWayPVC(pvc *corev1.PersistentVolumeClaim) bool {
if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 {
return false
}
anno := pvc.GetAnnotations()
if anno == nil {
return false
}
if _, ok := anno[vpAnnotationKey]; ok {
return true
}
return false
}

func IsOneWayPV(pv *corev1.PersistentVolume) bool {
if len(os.Getenv("USE-ONEWAY-STORAGE")) == 0 {
return false
}
return pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csiDriverName
}

0 comments on commit 76f614b

Please sign in to comment.