From 935578abd928a2a621be050eb91c230f43a51ff0 Mon Sep 17 00:00:00 2001 From: TzZtzt Date: Wed, 26 Jan 2022 14:40:11 +0800 Subject: [PATCH] Refactor Fluid CSI Plugin (#1395) * Refactor Fluid CSI Plugin Signed-off-by: TrafalgarZZZ * Go fmt Signed-off-by: TrafalgarZZZ * Option to enable fuse mount point recovery Signed-off-by: TrafalgarZZZ * Refactor Fluid CSI Plugin - Move package 'mountinfo' to utils - Move package 'fuse' to plugin - Add registry functions Signed-off-by: TrafalgarZZZ * Minor fix for fuse recover period Signed-off-by: TrafalgarZZZ * Remove duplicate license Signed-off-by: TrafalgarZZZ * Declare registraion funcs to enable/disable feature gates Signed-off-by: TrafalgarZZZ * Add tests Signed-off-by: TrafalgarZZZ * Add tests Signed-off-by: TrafalgarZZZ * Fix copyright Signed-off-by: TrafalgarZZZ --- cmd/csi/app/csi.go | 87 ++--------- pkg/csi/config/config.go | 23 +++ pkg/csi/{fuse => plugins}/controller.go | 2 +- pkg/csi/{fuse => plugins}/driver.go | 29 ++-- pkg/csi/{fuse => plugins}/nodeserver.go | 2 +- pkg/csi/plugins/register.go | 38 +++++ pkg/csi/recover/recover.go | 106 ++++++++++++-- pkg/csi/recover/recover_test.go | 138 ++++++++++++++++-- pkg/csi/recover/register.go | 41 ++++++ pkg/csi/register.go | 56 +++++++ pkg/{csi => utils}/mountinfo/mountinfo.go | 0 .../mountinfo/mountinfo_test.go | 0 pkg/{csi => utils}/mountinfo/mountpoint.go | 0 .../mountinfo/mountpoint_test.go | 0 14 files changed, 407 insertions(+), 115 deletions(-) create mode 100644 pkg/csi/config/config.go rename pkg/csi/{fuse => plugins}/controller.go (99%) rename pkg/csi/{fuse => plugins}/driver.go (81%) rename pkg/csi/{fuse => plugins}/nodeserver.go (99%) create mode 100644 pkg/csi/plugins/register.go create mode 100644 pkg/csi/recover/register.go create mode 100644 pkg/csi/register.go rename pkg/{csi => utils}/mountinfo/mountinfo.go (100%) rename pkg/{csi => utils}/mountinfo/mountinfo_test.go (100%) rename pkg/{csi => utils}/mountinfo/mountpoint.go (100%) rename pkg/{csi => utils}/mountinfo/mountpoint_test.go (100%) diff --git a/cmd/csi/app/csi.go b/cmd/csi/app/csi.go index 69fa5f58c4a..502403f7939 100644 --- a/cmd/csi/app/csi.go +++ b/cmd/csi/app/csi.go @@ -23,24 +23,16 @@ import ( "fmt" "github.com/fluid-cloudnative/fluid" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - csi "github.com/fluid-cloudnative/fluid/pkg/csi/fuse" - "github.com/fluid-cloudnative/fluid/pkg/csi/recover" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubelet" + "github.com/fluid-cloudnative/fluid/pkg/csi" + "github.com/fluid-cloudnative/fluid/pkg/csi/config" "github.com/golang/glog" "github.com/spf13/cobra" - "io/ioutil" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" "net/http" "net/http/pprof" "os" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" "time" ) @@ -52,8 +44,6 @@ var ( recoverFusePeriod int ) -const defaultKubeletTimeout = 10 - var scheme = runtime.NewScheme() var startCmd = &cobra.Command{ @@ -143,74 +133,21 @@ func handle() { }) if err != nil { - panic(fmt.Sprintf("csi: unable to create controller recover due to error %v", err)) + panic(fmt.Sprintf("csi: unable to create controller manager due to error %v", err)) } - ctx := ctrl.SetupSignalHandler() - go func() { - if err := mgr.Start(ctx); err != nil { - panic(fmt.Sprintf("unable to start controller recover due to error %v", err)) - } - }() - - if recoverFusePeriod > 0 { - if err := recoverStart(mgr.GetClient(), mgr.GetEventRecorderFor("FuseRecover")); err != nil { - panic(fmt.Sprintf("unable to start recover due to error %v", err)) - } + config := config.Config{ + NodeId: nodeID, + Endpoint: endpoint, + RecoverFusePeriod: recoverFusePeriod, } - d := csi.NewDriver(nodeID, endpoint, mgr.GetClient()) - d.Run() -} - -func recoverStart(kubeClient client.Client, recorder record.EventRecorder) (err error) { - glog.V(3).Infoln("start csi recover") - mountRoot, err := utils.GetMountRoot() - if err != nil { - return + if err = csi.SetupWithManager(mgr, config); err != nil { + panic(fmt.Sprintf("unable to set up manager due to error %v", err)) } - glog.V(3).Infof("Get mount root: %s", mountRoot) - // get CSI sa token - tokenByte, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") - if err != nil { - panic(fmt.Errorf("in cluster mode, find token failed, error: %v", err)) - } - token := string(tokenByte) - - glog.V(3).Infoln("start kubelet client") - nodeIp := os.Getenv("NODE_IP") - kubeletClientCert := os.Getenv("KUBELET_CLIENT_CERT") - kubeletClientKey := os.Getenv("KUBELET_CLIENT_KEY") - var kubeletTimeout int - if os.Getenv("KUBELET_TIMEOUT") != "" { - if kubeletTimeout, err = strconv.Atoi(os.Getenv("KUBELET_TIMEOUT")); err != nil { - glog.Errorf("parse kubelet timeout error: %v", err) - return - } - } else { - kubeletTimeout = defaultKubeletTimeout - } - glog.V(3).Infof("get node ip: %s", nodeIp) - kubeletClient, err := kubelet.NewKubeletClient(&kubelet.KubeletClientConfig{ - Address: nodeIp, - Port: 10250, - TLSClientConfig: rest.TLSClientConfig{ - ServerName: "kubelet", - CertFile: kubeletClientCert, - KeyFile: kubeletClientKey, - }, - BearerToken: token, - HTTPTimeout: time.Duration(kubeletTimeout) * time.Second, - }) - if err != nil { - glog.Error(err) - return + ctx := ctrl.SetupSignalHandler() + if err = mgr.Start(ctx); err != nil { + panic(fmt.Sprintf("unable to start controller recover due to error %v", err)) } - m := recover.NewFuseRecoder(kubeClient, kubeletClient, recorder) - // do recovering at beginning - // recover set containerStat in memory, it's none when start - m.Recover() - go m.Run(recoverFusePeriod, wait.NeverStop) - return } diff --git a/pkg/csi/config/config.go b/pkg/csi/config/config.go new file mode 100644 index 00000000000..af73ba383d3 --- /dev/null +++ b/pkg/csi/config/config.go @@ -0,0 +1,23 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +type Config struct { + NodeId string + Endpoint string + RecoverFusePeriod int +} diff --git a/pkg/csi/fuse/controller.go b/pkg/csi/plugins/controller.go similarity index 99% rename from pkg/csi/fuse/controller.go rename to pkg/csi/plugins/controller.go index 2945775d944..1372ed4d875 100644 --- a/pkg/csi/fuse/controller.go +++ b/pkg/csi/plugins/controller.go @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package csi +package plugins import ( "crypto/sha1" diff --git a/pkg/csi/fuse/driver.go b/pkg/csi/plugins/driver.go similarity index 81% rename from pkg/csi/fuse/driver.go rename to pkg/csi/plugins/driver.go index e1a976ad1c9..ea559c1e59e 100644 --- a/pkg/csi/fuse/driver.go +++ b/pkg/csi/plugins/driver.go @@ -13,28 +13,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -/* - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package csi +package plugins import ( + "context" "fmt" "os" "path/filepath" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" "strings" "github.com/container-storage-interface/spec/lib/go/csi" @@ -54,6 +41,8 @@ type driver struct { nodeId, endpoint string } +var _ manager.Runnable = &driver{} + func NewDriver(nodeID, endpoint string, client client.Client) *driver { glog.Infof("Driver: %v version: %v", driverName, version) @@ -88,6 +77,7 @@ func (d *driver) newControllerServer() *controllerServer { DefaultControllerServer: csicommon.NewDefaultControllerServer(d.csiDriver), } } + func (d *driver) newNodeServer() *nodeServer { return &nodeServer{ nodeId: d.nodeId, @@ -96,7 +86,7 @@ func (d *driver) newNodeServer() *nodeServer { } } -func (d *driver) Run() { +func (d *driver) run() { s := csicommon.NewNonBlockingGRPCServer() s.Start( d.endpoint, @@ -106,3 +96,8 @@ func (d *driver) Run() { ) s.Wait() } + +func (d *driver) Start(ctx context.Context) error { + d.run() + return nil +} diff --git a/pkg/csi/fuse/nodeserver.go b/pkg/csi/plugins/nodeserver.go similarity index 99% rename from pkg/csi/fuse/nodeserver.go rename to pkg/csi/plugins/nodeserver.go index 8b7a777dfd6..5960f3afa2e 100644 --- a/pkg/csi/fuse/nodeserver.go +++ b/pkg/csi/plugins/nodeserver.go @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package csi +package plugins import ( "fmt" diff --git a/pkg/csi/plugins/register.go b/pkg/csi/plugins/register.go new file mode 100644 index 00000000000..206c77ff518 --- /dev/null +++ b/pkg/csi/plugins/register.go @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "github.com/fluid-cloudnative/fluid/pkg/csi/config" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// Register initializes the csi driver and registers it to the controller manager. +func Register(mgr manager.Manager, cfg config.Config) error { + csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient()) + + if err := mgr.Add(csiDriver); err != nil { + return err + } + + return nil +} + +// Enabled checks if the csi driver should be enabled. +func Enabled(cfg config.Config) bool { + return true +} diff --git a/pkg/csi/recover/recover.go b/pkg/csi/recover/recover.go index 1ddbd1edf09..be39930666b 100644 --- a/pkg/csi/recover/recover.go +++ b/pkg/csi/recover/recover.go @@ -17,25 +17,39 @@ limitations under the License. package recover import ( + "context" "fmt" "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/csi/mountinfo" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume" "github.com/fluid-cloudnative/fluid/pkg/utils/kubelet" + "github.com/fluid-cloudnative/fluid/pkg/utils/mountinfo" "github.com/golang/glog" + "github.com/pkg/errors" + "io/ioutil" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" podutil "k8s.io/kubernetes/pkg/api/v1/pod" k8sexec "k8s.io/utils/exec" "k8s.io/utils/mount" + "os" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "strconv" "strings" "time" ) +const ( + defaultKubeletTimeout = 10 + serviceAccountTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" +) + +var _ manager.Runnable = &FuseRecover{} + type FuseRecover struct { mount.SafeFormatAndMount KubeClient client.Client @@ -43,6 +57,8 @@ type FuseRecover struct { Recorder record.EventRecorder containers map[string]*containerStat // key: -- + + recoverFusePeriod int } type containerStat struct { @@ -53,26 +69,92 @@ type containerStat struct { startAt metav1.Time } -func NewFuseRecoder(kubeClient client.Client, kubeletClient *kubelet.KubeletClient, recorder record.EventRecorder) *FuseRecover { +func initializeKubeletClient() (*kubelet.KubeletClient, error) { + // get CSI sa token + tokenByte, err := ioutil.ReadFile(serviceAccountTokenFile) + if err != nil { + return nil, errors.Wrap(err, "in cluster mode, find token failed") + } + token := string(tokenByte) + + glog.V(3).Infoln("start kubelet client") + nodeIp := os.Getenv("NODE_IP") + kubeletClientCert := os.Getenv("KUBELET_CLIENT_CERT") + kubeletClientKey := os.Getenv("KUBELET_CLIENT_KEY") + var kubeletTimeout int + if os.Getenv("KUBELET_TIMEOUT") != "" { + if kubeletTimeout, err = strconv.Atoi(os.Getenv("KUBELET_TIMEOUT")); err != nil { + return nil, errors.Wrap(err, "got error when parsing kubelet timeout") + } + } else { + kubeletTimeout = defaultKubeletTimeout + } + glog.V(3).Infof("get node ip: %s", nodeIp) + kubeletClient, err := kubelet.NewKubeletClient(&kubelet.KubeletClientConfig{ + Address: nodeIp, + Port: 10250, + TLSClientConfig: rest.TLSClientConfig{ + ServerName: "kubelet", + CertFile: kubeletClientCert, + KeyFile: kubeletClientKey, + }, + BearerToken: token, + HTTPTimeout: time.Duration(kubeletTimeout) * time.Second, + }) + + if err != nil { + return nil, err + } + + return kubeletClient, nil +} + +func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, recoverFusePeriod int) (*FuseRecover, error) { + glog.V(3).Infoln("start csi recover") + mountRoot, err := utils.GetMountRoot() + if err != nil { + return nil, errors.Wrap(err, "got err when getting mount root") + } + glog.V(3).Infof("Get mount root: %s", mountRoot) + + if err != nil { + return nil, errors.Wrap(err, "got error when creating kubelet client") + } + + kubeletClient, err := initializeKubeletClient() + if err != nil { + return nil, errors.Wrap(err, "failed to initialize kubelet") + } + return &FuseRecover{ SafeFormatAndMount: mount.SafeFormatAndMount{ Interface: mount.New(""), Exec: k8sexec.New(), }, - KubeClient: kubeClient, - KubeletClient: kubeletClient, - Recorder: recorder, - containers: make(map[string]*containerStat), - } + KubeClient: kubeClient, + KubeletClient: kubeletClient, + Recorder: recorder, + containers: make(map[string]*containerStat), + recoverFusePeriod: recoverFusePeriod, + }, nil +} + +func (r *FuseRecover) Start(ctx context.Context) error { + // do recovering at beginning + // recover set containerStat in memory, it's none when start + r.recover() + r.run(wait.NeverStop) + + return nil } -func (r *FuseRecover) Run(period int, stopCh <-chan struct{}) { - go wait.Until(r.run, time.Duration(period)*time.Second, stopCh) +func (r *FuseRecover) run(stopCh <-chan struct{}) { + go wait.Until(r.runOnce, time.Duration(r.recoverFusePeriod)*time.Second, stopCh) <-stopCh glog.V(3).Info("Shutdown CSI recover.") } -func (r *FuseRecover) run() { +func (r *FuseRecover) runOnce() { pods, err := r.KubeletClient.GetNodeRunningPods() glog.V(6).Info("get pods from kubelet") if err != nil { @@ -90,13 +172,13 @@ func (r *FuseRecover) run() { glog.V(6).Infof("get fluid fuse pod: %s, namespace: %s", pod.Name, pod.Namespace) if isRestarted := r.compareOrRecordContainerStat(pod); isRestarted { glog.V(3).Infof("fuse pod restarted: %s, namespace: %s", pod.Name, pod.Namespace) - r.Recover() + r.recover() return } } } -func (r FuseRecover) Recover() { +func (r FuseRecover) recover() { brokenMounts, err := mountinfo.GetBrokenMountPoints() if err != nil { glog.Error(err) diff --git a/pkg/csi/recover/recover_test.go b/pkg/csi/recover/recover_test.go index e04e56a9d5e..7ee9cc6ee5f 100644 --- a/pkg/csi/recover/recover_test.go +++ b/pkg/csi/recover/recover_test.go @@ -21,20 +21,27 @@ import ( . "github.com/agiledragon/gomonkey" "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/csi/mountinfo" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/fluid-cloudnative/fluid/pkg/utils/kubelet" + "github.com/fluid-cloudnative/fluid/pkg/utils/mountinfo" . "github.com/smartystreets/goconvey/convey" + "io/ioutil" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachineryRuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + k8sexec "k8s.io/utils/exec" "k8s.io/utils/mount" + "os" "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" "testing" "time" ) +const testfuseRecoverPeriod = 30 + var mockPod = v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"role": "juicefs-fuse"}, @@ -60,6 +67,33 @@ var mockPod = v1.Pod{ }, } +func Test_initializeKubeletClient(t *testing.T) { + Convey("Test_initializeKubeletClient", t, func() { + Convey("initialize success with non-default kubelet timeout", func() { + const ( + fakeToken = "fakeToken" + fakeNodeIP = "fakeNodeIP" + fakeClientCert = "" + fakeClientKey = "" + fakeKubeletTimeout = "120" + ) + patch1 := ApplyFunc(ioutil.ReadFile, func(filename string) ([]byte, error) { + return []byte(fakeToken), nil + }) + defer patch1.Reset() + + os.Setenv("NODE_IP", fakeNodeIP) + os.Setenv("KUBELET_CLIENT_CERT", fakeClientCert) + os.Setenv("KUBELET_CLIENT_KEY", fakeClientKey) + os.Setenv("KUBELET_TIMEOUT", fakeKubeletTimeout) + + kubeletClient, err := initializeKubeletClient() + So(err, ShouldBeNil) + So(kubeletClient, ShouldNotBeNil) + }) + }) +} + func TestRecover_run(t *testing.T) { Convey("TestRecover_run", t, func() { Convey("run success", func() { @@ -80,11 +114,17 @@ func TestRecover_run(t *testing.T) { }) defer patch2.Reset() - r := NewFuseRecoder(fake.NewFakeClient(), kubeclient, record.NewFakeRecorder(1)) - r.SafeFormatAndMount = mount.SafeFormatAndMount{ - Interface: &mount.FakeMounter{}, + r := &FuseRecover{ + SafeFormatAndMount: mount.SafeFormatAndMount{ + Interface: &mount.FakeMounter{}, + }, + KubeClient: fake.NewFakeClient(), + KubeletClient: kubeclient, + Recorder: record.NewFakeRecorder(1), + containers: make(map[string]*containerStat), + recoverFusePeriod: testfuseRecoverPeriod, } - r.run() + r.runOnce() }) Convey("GetNodeRunningPods error", func() { kubeclient := &kubelet.KubeletClient{} @@ -103,7 +143,7 @@ func TestRecover_run(t *testing.T) { KubeletClient: &kubelet.KubeletClient{}, Recorder: record.NewFakeRecorder(1), } - r.run() + r.runOnce() }) Convey("container restart", func() { kubeclient := &kubelet.KubeletClient{} @@ -116,7 +156,17 @@ func TestRecover_run(t *testing.T) { }) defer patch2.Reset() - r := NewFuseRecoder(fake.NewFakeClient(), kubeclient, record.NewFakeRecorder(1)) + r := &FuseRecover{ + SafeFormatAndMount: mount.SafeFormatAndMount{ + Interface: &mount.FakeMounter{}, + }, + KubeClient: fake.NewFakeClient(), + KubeletClient: kubeclient, + Recorder: record.NewFakeRecorder(1), + containers: make(map[string]*containerStat), + recoverFusePeriod: testfuseRecoverPeriod, + } + r.containers = map[string]*containerStat{ "test-container-test-juicefs-fuse-default": { name: "test-container", @@ -128,7 +178,7 @@ func TestRecover_run(t *testing.T) { }, }, } - r.run() + r.runOnce() }) }) } @@ -246,7 +296,16 @@ func TestFuseRecover_compareOrRecordContainerStat(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { kubeletClient := &kubelet.KubeletClient{} - r := NewFuseRecoder(fake.NewFakeClient(), kubeletClient, record.NewFakeRecorder(1)) + r := &FuseRecover{ + SafeFormatAndMount: mount.SafeFormatAndMount{ + Interface: &mount.FakeMounter{}, + }, + KubeClient: fake.NewFakeClient(), + KubeletClient: kubeletClient, + Recorder: record.NewFakeRecorder(1), + containers: make(map[string]*containerStat), + recoverFusePeriod: testfuseRecoverPeriod, + } if tt.fields.container != nil { r.containers[tt.fields.key] = tt.fields.container } @@ -402,3 +461,64 @@ func TestFuseRecover_eventRecord(t *testing.T) { }) } } + +func TestNewFuseRecover(t *testing.T) { + type args struct { + kubeClient client.Client + recorder record.EventRecorder + recoverFusePeriod int + } + + fakeClient := fake.NewFakeClient() + fakeRecorder := record.NewFakeRecorder(1) + fakeKubeletClient := &kubelet.KubeletClient{} + fakeContainersMap := make(map[string]*containerStat) + fakeRecoverFusePeriod := 20 + + tests := []struct { + name string + args args + want *FuseRecover + wantErr bool + }{ + { + name: "test_newFuseRecover", + args: args{ + kubeClient: fakeClient, + recorder: fakeRecorder, + recoverFusePeriod: fakeRecoverFusePeriod, + }, + want: &FuseRecover{ + SafeFormatAndMount: mount.SafeFormatAndMount{ + Interface: mount.New(""), + Exec: k8sexec.New(), + }, + KubeClient: fakeClient, + KubeletClient: fakeKubeletClient, + Recorder: fakeRecorder, + containers: fakeContainersMap, + recoverFusePeriod: fakeRecoverFusePeriod, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + os.Setenv(utils.MountRoot, "/runtime-mnt") + + patch := ApplyFunc(initializeKubeletClient, func() (*kubelet.KubeletClient, error) { + return fakeKubeletClient, nil + }) + defer patch.Reset() + + got, err := NewFuseRecover(tt.args.kubeClient, tt.args.recorder, tt.args.recoverFusePeriod) + if (err != nil) != tt.wantErr { + t.Errorf("NewFuseRecover() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewFuseRecover() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/csi/recover/register.go b/pkg/csi/recover/register.go new file mode 100644 index 00000000000..d65cb266a79 --- /dev/null +++ b/pkg/csi/recover/register.go @@ -0,0 +1,41 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recover + +import ( + "github.com/fluid-cloudnative/fluid/pkg/csi/config" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// Register initializes the fuse recover and registers it to the controller manager. +func Register(mgr manager.Manager, config config.Config) error { + fuseRecover, err := NewFuseRecover(mgr.GetClient(), mgr.GetEventRecorderFor("FuseRecover"), config.RecoverFusePeriod) + if err != nil { + return err + } + + if err = mgr.Add(fuseRecover); err != nil { + return err + } + + return nil +} + +// Enabled checks if the fuse recover should be enabled. +func Enabled(cfg config.Config) bool { + return cfg.RecoverFusePeriod > 0 +} diff --git a/pkg/csi/register.go b/pkg/csi/register.go new file mode 100644 index 00000000000..49c7f86846b --- /dev/null +++ b/pkg/csi/register.go @@ -0,0 +1,56 @@ +/* +Copyright 2022 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "github.com/fluid-cloudnative/fluid/pkg/csi/config" + "github.com/fluid-cloudnative/fluid/pkg/csi/plugins" + "github.com/fluid-cloudnative/fluid/pkg/csi/recover" + "github.com/golang/glog" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type registrationFuncs struct { + enabled func(cfg config.Config) bool + register func(mgr manager.Manager, cfg config.Config) error +} + +var registraions map[string]registrationFuncs + +func init() { + registraions = map[string]registrationFuncs{} + + registraions["plugins"] = registrationFuncs{enabled: plugins.Enabled, register: plugins.Register} + registraions["recover"] = registrationFuncs{enabled: recover.Enabled, register: recover.Register} +} + +// SetupWithManager registers all the enabled components defined in registrations to the controller manager. +func SetupWithManager(mgr manager.Manager, cfg config.Config) error { + for rName, r := range registraions { + if r.enabled(cfg) { + glog.Infof("Registering %s to controller manager", rName) + if err := r.register(mgr, cfg); err != nil { + glog.Errorf("Got error when registering %s, error: %v", rName, err) + return err + } + } else { + glog.Infof("%s is not enabled", rName) + } + } + + return nil +} diff --git a/pkg/csi/mountinfo/mountinfo.go b/pkg/utils/mountinfo/mountinfo.go similarity index 100% rename from pkg/csi/mountinfo/mountinfo.go rename to pkg/utils/mountinfo/mountinfo.go diff --git a/pkg/csi/mountinfo/mountinfo_test.go b/pkg/utils/mountinfo/mountinfo_test.go similarity index 100% rename from pkg/csi/mountinfo/mountinfo_test.go rename to pkg/utils/mountinfo/mountinfo_test.go diff --git a/pkg/csi/mountinfo/mountpoint.go b/pkg/utils/mountinfo/mountpoint.go similarity index 100% rename from pkg/csi/mountinfo/mountpoint.go rename to pkg/utils/mountinfo/mountpoint.go diff --git a/pkg/csi/mountinfo/mountpoint_test.go b/pkg/utils/mountinfo/mountpoint_test.go similarity index 100% rename from pkg/csi/mountinfo/mountpoint_test.go rename to pkg/utils/mountinfo/mountpoint_test.go