From d25d80b72c46465d96bfd154906aaa23aca4c951 Mon Sep 17 00:00:00 2001 From: Nizifan <505529920@qq.com> Date: Wed, 19 Jan 2022 17:30:03 +0800 Subject: [PATCH] Add remount check for hostpath mount during sync (#1340) * Add remount check for hostpath mount during sync Signed-off-by: nizifan * Fix CI issue & rename function Signed-off-by: nizifan * Add ut & fix potential leak of ufstoupdate.toadd Signed-off-by: nizifan * improve test coverage & address comments - add new line Signed-off-by: nizifan * Fix ci issue Signed-off-by: nizifan * Only perform remount operation when mounttime ealier than master starting time Signed-off-by: nizifan * Fix ci issue Signed-off-by: nizifan * add ut Signed-off-by: nizifan * rename function Signed-off-by: nizifan Co-authored-by: nizifan Signed-off-by: zwwhdls --- api/v1alpha1/status.go | 4 + pkg/ddc/alluxio/operations/base.go | 24 ++ pkg/ddc/alluxio/operations/base_test.go | 49 ++++ pkg/ddc/alluxio/ufs.go | 52 +++++ pkg/ddc/alluxio/ufs_internal.go | 80 ++++++- pkg/ddc/alluxio/ufs_test.go | 287 ++++++++++++++++++++++++ pkg/ddc/alluxio/utils.go | 10 + pkg/ddc/alluxio/utils_test.go | 62 +++++ pkg/utils/dataset.go | 19 ++ pkg/utils/dataset_test.go | 42 ++++ 10 files changed, 628 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index a36e7681ef6..af8a9a720ba 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -120,6 +120,10 @@ type RuntimeStatus struct { // APIGatewayStatus represents rest api gateway status APIGatewayStatus *APIGatewayStatus `json:"apiGateway,omitempty"` + + // MountTime represents time last mount happened + // if Mounttime is early than master starting time, remount will be required + MountTime metav1.Time `json:"mountTime,omitempty"` } type RuntimePhase string diff --git a/pkg/ddc/alluxio/operations/base.go b/pkg/ddc/alluxio/operations/base.go index 0d4fd573493..53f5cf45e58 100644 --- a/pkg/ddc/alluxio/operations/base.go +++ b/pkg/ddc/alluxio/operations/base.go @@ -26,6 +26,8 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" + + "github.com/fluid-cloudnative/fluid/pkg/utils" ) type AlluxioFileUtils struct { @@ -293,6 +295,28 @@ func (a AlluxioFileUtils) IsMounted(alluxioPath string) (mounted bool, err error return mounted, err } +func (a AlluxioFileUtils) FindUnmountedAlluxioPaths(alluxioPaths []string) ([]string, error) { + var ( + command = []string{"alluxio", "fs", "mount"} + stdout string + stderr string + ) + + stdout, stderr, err := a.exec(command, true) + if err != nil { + return []string{}, fmt.Errorf("execute command %v with expectedErr: %v stdout %s and stderr %s", command, err, stdout, stderr) + } + + results := strings.Split(stdout, "\n") + var mountedPaths []string + for _, line := range results { + fields := strings.Fields(line) + mountedPaths = append(mountedPaths, fields[2]) + } + + return utils.SubtractString(alluxioPaths, mountedPaths), err +} + // Check if the Alluxio is ready by running `alluxio fsadmin report` command func (a AlluxioFileUtils) Ready() (ready bool) { var ( diff --git a/pkg/ddc/alluxio/operations/base_test.go b/pkg/ddc/alluxio/operations/base_test.go index ea80a9e8d3f..40142752cef 100644 --- a/pkg/ddc/alluxio/operations/base_test.go +++ b/pkg/ddc/alluxio/operations/base_test.go @@ -490,6 +490,55 @@ func TestAlluxioFileUtils_IsMounted(t *testing.T) { } } +func TestAlluxioFileUtils_FindUnmountedAlluxioPaths(t *testing.T) { + const returnMessage = `s3://bucket/path/train on /cache (s3, capacity=-1B, used=-1B, not read-only, not shared, properties={alluxio.underfs.s3.inherit.acl=false, alluxio.underfs.s3.endpoint=s3endpoint, aws.secretKey=, aws.accessKeyId=}) +/underFSStorage on / (local, capacity=0B, used=0B, not read-only, not shared, properties={})` + + ExecCommon := func(a AlluxioFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { + return returnMessage, "", nil + } + a := &AlluxioFileUtils{log: logf.NullLogger{}} + + err := gohook.Hook(AlluxioFileUtils.exec, ExecCommon, nil) + if err != nil { + t.Fatal(err.Error()) + } + var testCases = []struct { + alluxioPaths []string + expectedUnmountedPaths []string + }{ + { + alluxioPaths: []string{"/cache"}, + expectedUnmountedPaths: []string{}, + }, + { + alluxioPaths: []string{"/cache", "/cache2"}, + expectedUnmountedPaths: []string{"/cache2"}, + }, + { + alluxioPaths: []string{}, + expectedUnmountedPaths: []string{}, + }, + { + alluxioPaths: []string{"/cache2"}, + expectedUnmountedPaths: []string{"/cache2"}, + }, + } + for index, test := range testCases { + unmountedPaths, err := a.FindUnmountedAlluxioPaths(test.alluxioPaths) + if err != nil { + t.Errorf("%d check failure, want nil, got err: %v", index, err) + return + } + + if ( len(unmountedPaths) != 0 || len(test.expectedUnmountedPaths) != 0 ) && + !reflect.DeepEqual(unmountedPaths,test.expectedUnmountedPaths) { + t.Errorf("%d check failure, want: %s, got: %s", index, strings.Join(test.expectedUnmountedPaths, ","), strings.Join(unmountedPaths, ",")) + return + } + } +} + func TestAlluxioFileUtils_Ready(t *testing.T) { ExecCommon := func(a AlluxioFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { return "Alluxio cluster summary: ", "", nil diff --git a/pkg/ddc/alluxio/ufs.go b/pkg/ddc/alluxio/ufs.go index e51c088737b..33bd4323036 100644 --- a/pkg/ddc/alluxio/ufs.go +++ b/pkg/ddc/alluxio/ufs.go @@ -16,8 +16,11 @@ limitations under the License. package alluxio import ( + "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // UsedStorageBytes returns used storage size of Alluxio in bytes @@ -90,6 +93,9 @@ func (e *AlluxioEngine) ShouldUpdateUFS() (ufsToUpdate *utils.UFSToUpdate) { ufsToUpdate = utils.NewUFSToUpdate(dataset) ufsToUpdate.AnalyzePathsDelta() + // 3. for hostpath ufs mount, check if all mountpoints have been mounted + e.checkIfRemountRequired(ufsToUpdate) + return } @@ -118,3 +124,49 @@ func (e *AlluxioEngine) UpdateOnUFSChange(ufsToUpdate *utils.UFSToUpdate) (updat updateReady = true return } + +func (e *AlluxioEngine) checkIfRemountRequired(ufsToUpdate *utils.UFSToUpdate) { + runtime, err := e.getRuntime() + if err != nil { + e.Log.Error(err, "checkIfRemountRequired", "runtime", e.name) + return + } + + masterPodName, masterContainerName := e.getMasterPodInfo() + masterPod, err := e.getMasterPod(masterPodName, e.namespace) + if err != nil { + e.Log.Error(err, "checkIfRemountRequired", "master pod", e.name) + return + } + + var startedAt *v1.Time + for _, containerStatus := range masterPod.Status.ContainerStatuses { + if containerStatus.Name == masterContainerName { + if containerStatus.State.Running == nil{ + e.Log.Error(fmt.Errorf("Container is not running"), "checkIfRemountRequired", "master pod", masterPodName) + return + } else { + startedAt = &containerStatus.State.Running.StartedAt + break + } + } + } + + // If mounttime is earlier than master container starttime, remount is necessary + if startedAt != nil && runtime.Status.MountTime.Before(startedAt) { + e.Log.Info("remount on master restart", "alluxioruntime", e.name) + + unmountedPaths, err := e.FindUnmountedUFS() + if err != nil { + e.Log.Error(err, "Failed in finding unmounted ufs") + return + } + + if len(unmountedPaths) != 0 { + ufsToUpdate.AddMountPaths(unmountedPaths) + } else { + // if no path can be mounted, set mountTime to be now + e.updateMountTime() + } + } +} diff --git a/pkg/ddc/alluxio/ufs_internal.go b/pkg/ddc/alluxio/ufs_internal.go index 940eb78057f..b473f0f0dd7 100644 --- a/pkg/ddc/alluxio/ufs_internal.go +++ b/pkg/ddc/alluxio/ufs_internal.go @@ -18,13 +18,17 @@ package alluxio import ( "context" "fmt" + "reflect" + "time" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" - "reflect" + + "k8s.io/client-go/util/retry" ) func (e *AlluxioEngine) usedStorageBytesInternal() (value int64, err error) { @@ -99,6 +103,42 @@ func (e *AlluxioEngine) shouldMountUFS() (should bool, err error) { return should, err } +// FindUnmountedUFS return if UFSs not mounted +func (e *AlluxioEngine) FindUnmountedUFS() (unmountedPaths []string, err error) { + dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) + e.Log.Info("get dataset info", "dataset", dataset) + if err != nil { + return unmountedPaths, err + } + + podName, containerName := e.getMasterPodInfo() + fileUtils := operations.NewAlluxioFileUtils(podName, containerName, e.namespace, e.Log) + + ready := fileUtils.Ready() + if !ready { + err = fmt.Errorf("the UFS is not ready") + return unmountedPaths, err + } + + var alluxioPaths []string + // Check if any of the Mounts has not been mounted in Alluxio + for _, mount := range dataset.Spec.Mounts { + if common.IsFluidNativeScheme(mount.MountPoint) { + // No need for a mount point with Fluid native scheme('local://' and 'pvc://') to be mounted + continue + } + alluxioPath := utils.UFSPathBuilder{}.GenAlluxioMountPath(mount, dataset.Spec.Mounts) + alluxioPaths = append(alluxioPaths, alluxioPath) + } + + // For fluid native schema, skip mount check to avoid unnecessary system call + if len(alluxioPaths) == 0{ + return + } + + return fileUtils.FindUnmountedAlluxioPaths(alluxioPaths) +} + func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err error) { dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) if err != nil { @@ -113,6 +153,7 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err return fmt.Errorf("the UFS is not ready, namespace:%s,name:%s", e.namespace, e.name) } + everMounted := false // Iterate all the mount points, do mount if the mount point is in added array // TODO: not allow to edit FluidNativeScheme MountPoint for _, mount := range dataset.Spec.Mounts { @@ -154,6 +195,8 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err if err != nil { return err } + + everMounted = true } } @@ -183,6 +226,10 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err return nil } + if everMounted { + e.updateMountTime() + } + return nil } @@ -201,6 +248,7 @@ func (e *AlluxioEngine) mountUFS() (err error) { return fmt.Errorf("the UFS is not ready") } + everMounted := false // Iterate all the mount points, do mount if the mount point is not Fluid-native(e.g. Hostpath or PVC) for _, mount := range dataset.Spec.Mounts { mount := mount @@ -226,9 +274,15 @@ func (e *AlluxioEngine) mountUFS() (err error) { if err != nil { return err } + + everMounted = true } + } + if everMounted { + e.updateMountTime() } + return nil } @@ -260,3 +314,27 @@ func (e *AlluxioEngine) genUFSMountOptions(m datav1alpha1.Mount) (map[string]str return mOptions, nil } + +func (e *AlluxioEngine) updateMountTime() { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + runtimeToUpdate := runtime.DeepCopy() + runtimeToUpdate.Status.MountTime.Time = time.Now() + + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + err = e.Client.Status().Update(context.TODO(), runtimeToUpdate) + } else { + e.Log.Info("Do nothing because the runtime status is not changed.") + } + + return err + }) + + if err != nil { + e.Log.Error(err, "UpdateMountTime", "", e.name) + } +} diff --git a/pkg/ddc/alluxio/ufs_test.go b/pkg/ddc/alluxio/ufs_test.go index 51d1dbdb0ed..42fcb07f74d 100644 --- a/pkg/ddc/alluxio/ufs_test.go +++ b/pkg/ddc/alluxio/ufs_test.go @@ -16,13 +16,16 @@ limitations under the License. package alluxio import ( + "strings" "testing" + "time" "reflect" . "github.com/agiledragon/gomonkey" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" @@ -314,3 +317,287 @@ func TestPrepareUFS(t *testing.T) { }) } } + +func TestFindUnmountedUFS(t *testing.T) { + + type fields struct { + mountPoints []datav1alpha1.Mount + wantedUnmountedPaths []string + } + + tests := []fields { + { + mountPoints: []datav1alpha1.Mount{ + { + MountPoint: "s3://bucket/path/train", + Path: "/path1", + }, + }, + wantedUnmountedPaths: []string{"/path1"}, + }, + { + mountPoints: []datav1alpha1.Mount{ + { + MountPoint: "local://mnt/test", + Path: "/path2", + }, + }, + wantedUnmountedPaths: []string{}, + }, + { + mountPoints: []datav1alpha1.Mount{ + { + MountPoint: "s3://bucket/path/train", + Path: "/path1", + }, + { + MountPoint: "local://mnt/test", + Path: "/path2", + }, + { + MountPoint: "hdfs://endpoint/path/train", + Path: "/path3", + }, + }, + wantedUnmountedPaths: []string{"/path1", "/path3"}, + }, + } + + for index, test := range tests { + t.Run("test", func(t *testing.T) { + s := runtime.NewScheme() + runtime := datav1alpha1.AlluxioRuntime{} + dataset := datav1alpha1.Dataset{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: test.mountPoints, + }, + } + + s.AddKnownTypes(datav1alpha1.GroupVersion, &runtime) + s.AddKnownTypes(datav1alpha1.GroupVersion, &dataset) + _ = corev1.AddToScheme(s) + mockClient := fake.NewFakeClientWithScheme(s, &runtime, &dataset) + + var afsUtils operations.AlluxioFileUtils + patch1 := ApplyMethod(reflect.TypeOf(afsUtils), "Ready", func(_ operations.AlluxioFileUtils) bool { + return true + }) + defer patch1.Reset() + + patch2 := ApplyMethod(reflect.TypeOf(afsUtils), "FindUnmountedAlluxioPaths", func(_ operations.AlluxioFileUtils, alluxioPaths []string) ([]string, error) { + return alluxioPaths, nil + }) + defer patch2.Reset() + + e := &AlluxioEngine{ + runtime: &runtime, + name: "test", + namespace: "default", + Log: log.NullLogger{}, + Client: mockClient, + MetadataSyncDoneCh: nil, + } + + unmountedPaths, err := e.FindUnmountedUFS() + if err != nil{ + t.Errorf("AlluxioEngine.FindUnmountedUFS() error = %v", err) + return + } + if (len(unmountedPaths) != 0 || len(test.wantedUnmountedPaths) != 0 ) && + !reflect.DeepEqual(unmountedPaths,test.wantedUnmountedPaths) { + t.Errorf("%d check failure, want: %s, got: %s", index, strings.Join(test.wantedUnmountedPaths, ","), strings.Join(unmountedPaths, ",")) + return + } + }) + } +} + +func TestUpdateMountTime(t *testing.T) { + yesterday := time.Now().AddDate( 0, 0, -1) + + type fields struct { + runtime *datav1alpha1.AlluxioRuntime + } + + tests := []fields { + { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Status: datav1alpha1.RuntimeStatus{ + MountTime: v1.Time{ + Time: yesterday, + }, + }, + }, + }, + } + + for index, test := range tests { + t.Run("test", func(t *testing.T) { + s := runtime.NewScheme() + s.AddKnownTypes(datav1alpha1.GroupVersion, test.runtime) + _ = corev1.AddToScheme(s) + mockClient := fake.NewFakeClientWithScheme(s, test.runtime) + + e := &AlluxioEngine{ + runtime: test.runtime, + name: "test", + namespace: "default", + Log: log.NullLogger{}, + Client: mockClient, + MetadataSyncDoneCh: nil, + } + + e.updateMountTime() + runtime, _ := e.getRuntime() + if runtime.Status.MountTime.Time.Equal(yesterday) { + t.Errorf("%d check failure, got: %v, unexpected: %v", index, runtime.Status.MountTime.Time, yesterday) + return + } + }) + } +} + +func TestCheckIfRemountRequired(t *testing.T) { + yesterday := time.Now().AddDate( 0, 0, -1) + + type fields struct { + runtime *datav1alpha1.AlluxioRuntime + pod *corev1.Pod + wanted []string + } + + tests := []fields { + { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Status: datav1alpha1.RuntimeStatus{ + MountTime: v1.Time{ + Time: yesterday, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-master-0", + Namespace: "default", + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "alluxio-master", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: v1.Time{ + Time: yesterday.AddDate(0, 0, 1), + }, + }, + }, + }, + }, + }, + }, + wanted: []string { + "/path", + }, + }, + { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Status: datav1alpha1.RuntimeStatus{ + MountTime: v1.Time{ + Time: yesterday, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-master-0", + Namespace: "default", + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "alluxio-master", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: v1.Time{ + Time: yesterday.AddDate(0, 0, -1), + }, + }, + }, + }, + }, + }, + }, + wanted: []string {}, + }, + } + + dataset := datav1alpha1.Dataset{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + MountPoint: "s3://bucket/path/train", + Path: "/path", + }, + }, + }, + } + + for index, test := range tests { + t.Run("test", func(t *testing.T) { + s := runtime.NewScheme() + s.AddKnownTypes(datav1alpha1.GroupVersion, test.runtime) + s.AddKnownTypes(datav1alpha1.GroupVersion, &dataset) + s.AddKnownTypes(corev1.SchemeGroupVersion, test.pod) + _ = corev1.AddToScheme(s) + mockClient := fake.NewFakeClientWithScheme(s, test.runtime, &dataset, test.pod) + + e := &AlluxioEngine{ + runtime: test.runtime, + name: "test", + namespace: "default", + Log: log.NullLogger{}, + Client: mockClient, + MetadataSyncDoneCh: nil, + } + + var afsUtils operations.AlluxioFileUtils + patch1 := ApplyMethod(reflect.TypeOf(afsUtils), "Ready", func(_ operations.AlluxioFileUtils) bool { + return true + }) + defer patch1.Reset() + + patch2 := ApplyMethod(reflect.TypeOf(afsUtils), "FindUnmountedAlluxioPaths", func(_ operations.AlluxioFileUtils, alluxioPaths []string) ([]string, error) { + return alluxioPaths, nil + }) + defer patch2.Reset() + + ufsToUpdate := utils.NewUFSToUpdate(&dataset) + e.checkIfRemountRequired(ufsToUpdate) + if (len(ufsToUpdate.ToAdd()) != 0 || len(test.wanted) != 0 ) && + !reflect.DeepEqual(ufsToUpdate.ToAdd(),test.wanted) { + t.Errorf("%d check failure, got: %v, expected: %s", index, ufsToUpdate.ToAdd(),test.wanted) + return + } + }) + } +} diff --git a/pkg/ddc/alluxio/utils.go b/pkg/ddc/alluxio/utils.go index ffccaff7ee9..9fc3d735a0a 100644 --- a/pkg/ddc/alluxio/utils.go +++ b/pkg/ddc/alluxio/utils.go @@ -54,6 +54,16 @@ func (e *AlluxioEngine) getRuntime() (*datav1alpha1.AlluxioRuntime, error) { return &runtime, nil } +func (e *AlluxioEngine) getMasterPod(name string, namespace string) (pod *v1.Pod, err error) { + pod = &v1.Pod{} + err = e.Client.Get(context.TODO(), types.NamespacedName{ + Namespace: namespace, + Name: name, + }, pod) + + return pod, err +} + func (e *AlluxioEngine) getMasterStatefulset(name string, namespace string) (master *appsv1.StatefulSet, err error) { master = &appsv1.StatefulSet{} err = e.Client.Get(context.TODO(), types.NamespacedName{ diff --git a/pkg/ddc/alluxio/utils_test.go b/pkg/ddc/alluxio/utils_test.go index c8a11ac57d0..cba7102322a 100644 --- a/pkg/ddc/alluxio/utils_test.go +++ b/pkg/ddc/alluxio/utils_test.go @@ -412,6 +412,68 @@ func TestGetRuntime(t *testing.T) { } } +func TestGetMasterPod(t *testing.T) { + type fields struct { + runtime *datav1alpha1.AlluxioRuntime + name string + namespace string + } + tests := []struct { + name string + fields fields + want *corev1.Pod + wantErr bool + }{ + { + name: "test", + fields: fields{ + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: v1.ObjectMeta{ + Name: "spark-master", + Namespace: "default", + }, + }, + name: "spark-master", + namespace: "default", + }, + want: &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "spark-master", + Namespace: "default", + }, + TypeMeta: v1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := runtime.NewScheme() + s.AddKnownTypes(datav1alpha1.GroupVersion, tt.fields.runtime) + s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.Pod{}) + _ = corev1.AddToScheme(s) + mockClient := fake.NewFakeClientWithScheme(s, tt.fields.runtime, tt.want) + e := &AlluxioEngine{ + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Client: mockClient, + } + gotMaster, err := e.getMasterPod(tt.fields.name, tt.fields.namespace) + if (err != nil) != tt.wantErr { + t.Errorf("AlluxioEngine.getMasterPod() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotMaster, tt.want) { + t.Errorf("AlluxioEngine.getMasterPod() = %#v, want %#v", gotMaster, tt.want) + } + }) + } +} + func TestGetMasterStatefulset(t *testing.T) { type fields struct { runtime *datav1alpha1.AlluxioRuntime diff --git a/pkg/utils/dataset.go b/pkg/utils/dataset.go index 2162f1f4ca4..4314632434c 100644 --- a/pkg/utils/dataset.go +++ b/pkg/utils/dataset.go @@ -184,3 +184,22 @@ func (u *UFSToUpdate) ToAdd() []string { func (u *UFSToUpdate) ToRemove() []string { return u.toRemove } + +// AddMountPaths add mounted path to ToAdd +func (u *UFSToUpdate) AddMountPaths(mountPaths []string) { + if len(u.toAdd) == 0{ + u.toAdd = mountPaths + return + } + + set := make(map[string]struct{}, len(u.toAdd)) + for _, i := range u.toAdd { + set[i] = struct{}{} + } + + for _, mountPath := range mountPaths{ + _, ok := set[mountPath]; if !ok{ + u.toAdd = append(u.toAdd, mountPath) + } + } +} diff --git a/pkg/utils/dataset_test.go b/pkg/utils/dataset_test.go index 0ceb4339132..f7aaa849fcc 100644 --- a/pkg/utils/dataset_test.go +++ b/pkg/utils/dataset_test.go @@ -548,3 +548,45 @@ func TestUfsToUpdate(t *testing.T) { } } + +func TestAddMountPaths(t *testing.T) { + testCases := []struct { + originAdd []string + toAdd []string + result []string + }{ + { + originAdd: []string{"/path1"}, + toAdd: []string{"/path2"}, + result: []string{"/path1", "/path2"}, + }, + { + originAdd: []string{"/path1"}, + toAdd: []string{"/path1"}, + result: []string{"/path1"}, + }, + { + originAdd: []string{}, + toAdd: []string{"/path1"}, + result: []string{"/path1"}, + }, + { + originAdd: []string{"/path1"}, + toAdd: []string{}, + result: []string{"/path1"}, + }, + } + + for k, item := range testCases { + ufsToUpdate := NewUFSToUpdate(&datav1alpha1.Dataset{}) + ufsToUpdate.toAdd = item.originAdd + + ufsToUpdate.AddMountPaths(item.toAdd) + + if len(item.result) != 0 || len(ufsToUpdate.ToAdd()) != 0 { + if !reflect.DeepEqual(item.result, ufsToUpdate.ToAdd()) { + t.Errorf("%d check fail, expected mountpath is %s, get %s", k, item.result, ufsToUpdate.ToAdd()) + } + } + } +}