Skip to content

Commit

Permalink
Add remount check for hostpath mount during sync (fluid-cloudnative#1340
Browse files Browse the repository at this point in the history
)

* Add remount check for hostpath mount during sync

Signed-off-by: nizifan <[email protected]>

* Fix CI issue & rename function

Signed-off-by: nizifan <[email protected]>

* Add ut & fix potential leak of ufstoupdate.toadd

Signed-off-by: nizifan <[email protected]>

* improve test coverage & address comments - add new line

Signed-off-by: nizifan <[email protected]>

* Fix ci issue

Signed-off-by: nizifan <[email protected]>

* Only perform remount operation when mounttime ealier than master starting time

Signed-off-by: nizifan <[email protected]>

* Fix ci issue

Signed-off-by: nizifan <[email protected]>

* add ut

Signed-off-by: nizifan <[email protected]>

* rename function

Signed-off-by: nizifan <[email protected]>

Co-authored-by: nizifan <[email protected]>
Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
2 people authored and zwwhdls committed Mar 18, 2022
1 parent e69c034 commit d25d80b
Show file tree
Hide file tree
Showing 10 changed files with 628 additions and 1 deletion.
4 changes: 4 additions & 0 deletions api/v1alpha1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type RuntimeStatus struct {

// APIGatewayStatus represents rest api gateway status
APIGatewayStatus *APIGatewayStatus `json:"apiGateway,omitempty"`

// MountTime represents time last mount happened
// if Mounttime is early than master starting time, remount will be required
MountTime metav1.Time `json:"mountTime,omitempty"`
}

type RuntimePhase string
Expand Down
24 changes: 24 additions & 0 deletions pkg/ddc/alluxio/operations/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/go-logr/logr"

"github.com/fluid-cloudnative/fluid/pkg/utils"
)

type AlluxioFileUtils struct {
Expand Down Expand Up @@ -293,6 +295,28 @@ func (a AlluxioFileUtils) IsMounted(alluxioPath string) (mounted bool, err error
return mounted, err
}

func (a AlluxioFileUtils) FindUnmountedAlluxioPaths(alluxioPaths []string) ([]string, error) {
var (
command = []string{"alluxio", "fs", "mount"}
stdout string
stderr string
)

stdout, stderr, err := a.exec(command, true)
if err != nil {
return []string{}, fmt.Errorf("execute command %v with expectedErr: %v stdout %s and stderr %s", command, err, stdout, stderr)
}

results := strings.Split(stdout, "\n")
var mountedPaths []string
for _, line := range results {
fields := strings.Fields(line)
mountedPaths = append(mountedPaths, fields[2])
}

return utils.SubtractString(alluxioPaths, mountedPaths), err
}

// Check if the Alluxio is ready by running `alluxio fsadmin report` command
func (a AlluxioFileUtils) Ready() (ready bool) {
var (
Expand Down
49 changes: 49 additions & 0 deletions pkg/ddc/alluxio/operations/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,55 @@ func TestAlluxioFileUtils_IsMounted(t *testing.T) {
}
}

func TestAlluxioFileUtils_FindUnmountedAlluxioPaths(t *testing.T) {
const returnMessage = `s3://bucket/path/train on /cache (s3, capacity=-1B, used=-1B, not read-only, not shared, properties={alluxio.underfs.s3.inherit.acl=false, alluxio.underfs.s3.endpoint=s3endpoint, aws.secretKey=, aws.accessKeyId=})
/underFSStorage on / (local, capacity=0B, used=0B, not read-only, not shared, properties={})`

ExecCommon := func(a AlluxioFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) {
return returnMessage, "", nil
}
a := &AlluxioFileUtils{log: logf.NullLogger{}}

err := gohook.Hook(AlluxioFileUtils.exec, ExecCommon, nil)
if err != nil {
t.Fatal(err.Error())
}
var testCases = []struct {
alluxioPaths []string
expectedUnmountedPaths []string
}{
{
alluxioPaths: []string{"/cache"},
expectedUnmountedPaths: []string{},
},
{
alluxioPaths: []string{"/cache", "/cache2"},
expectedUnmountedPaths: []string{"/cache2"},
},
{
alluxioPaths: []string{},
expectedUnmountedPaths: []string{},
},
{
alluxioPaths: []string{"/cache2"},
expectedUnmountedPaths: []string{"/cache2"},
},
}
for index, test := range testCases {
unmountedPaths, err := a.FindUnmountedAlluxioPaths(test.alluxioPaths)
if err != nil {
t.Errorf("%d check failure, want nil, got err: %v", index, err)
return
}

if ( len(unmountedPaths) != 0 || len(test.expectedUnmountedPaths) != 0 ) &&
!reflect.DeepEqual(unmountedPaths,test.expectedUnmountedPaths) {
t.Errorf("%d check failure, want: %s, got: %s", index, strings.Join(test.expectedUnmountedPaths, ","), strings.Join(unmountedPaths, ","))
return
}
}
}

func TestAlluxioFileUtils_Ready(t *testing.T) {
ExecCommon := func(a AlluxioFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) {
return "Alluxio cluster summary: ", "", nil
Expand Down
52 changes: 52 additions & 0 deletions pkg/ddc/alluxio/ufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ limitations under the License.
package alluxio

import (
"fmt"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// UsedStorageBytes returns used storage size of Alluxio in bytes
Expand Down Expand Up @@ -90,6 +93,9 @@ func (e *AlluxioEngine) ShouldUpdateUFS() (ufsToUpdate *utils.UFSToUpdate) {
ufsToUpdate = utils.NewUFSToUpdate(dataset)
ufsToUpdate.AnalyzePathsDelta()

// 3. for hostpath ufs mount, check if all mountpoints have been mounted
e.checkIfRemountRequired(ufsToUpdate)

return
}

Expand Down Expand Up @@ -118,3 +124,49 @@ func (e *AlluxioEngine) UpdateOnUFSChange(ufsToUpdate *utils.UFSToUpdate) (updat
updateReady = true
return
}

func (e *AlluxioEngine) checkIfRemountRequired(ufsToUpdate *utils.UFSToUpdate) {
runtime, err := e.getRuntime()
if err != nil {
e.Log.Error(err, "checkIfRemountRequired", "runtime", e.name)
return
}

masterPodName, masterContainerName := e.getMasterPodInfo()
masterPod, err := e.getMasterPod(masterPodName, e.namespace)
if err != nil {
e.Log.Error(err, "checkIfRemountRequired", "master pod", e.name)
return
}

var startedAt *v1.Time
for _, containerStatus := range masterPod.Status.ContainerStatuses {
if containerStatus.Name == masterContainerName {
if containerStatus.State.Running == nil{
e.Log.Error(fmt.Errorf("Container is not running"), "checkIfRemountRequired", "master pod", masterPodName)
return
} else {
startedAt = &containerStatus.State.Running.StartedAt
break
}
}
}

// If mounttime is earlier than master container starttime, remount is necessary
if startedAt != nil && runtime.Status.MountTime.Before(startedAt) {
e.Log.Info("remount on master restart", "alluxioruntime", e.name)

unmountedPaths, err := e.FindUnmountedUFS()
if err != nil {
e.Log.Error(err, "Failed in finding unmounted ufs")
return
}

if len(unmountedPaths) != 0 {
ufsToUpdate.AddMountPaths(unmountedPaths)
} else {
// if no path can be mounted, set mountTime to be now
e.updateMountTime()
}
}
}
80 changes: 79 additions & 1 deletion pkg/ddc/alluxio/ufs_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package alluxio
import (
"context"
"fmt"
"reflect"
"time"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/pkg/errors"
"reflect"

"k8s.io/client-go/util/retry"
)

func (e *AlluxioEngine) usedStorageBytesInternal() (value int64, err error) {
Expand Down Expand Up @@ -99,6 +103,42 @@ func (e *AlluxioEngine) shouldMountUFS() (should bool, err error) {
return should, err
}

// FindUnmountedUFS return if UFSs not mounted
func (e *AlluxioEngine) FindUnmountedUFS() (unmountedPaths []string, err error) {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
e.Log.Info("get dataset info", "dataset", dataset)
if err != nil {
return unmountedPaths, err
}

podName, containerName := e.getMasterPodInfo()
fileUtils := operations.NewAlluxioFileUtils(podName, containerName, e.namespace, e.Log)

ready := fileUtils.Ready()
if !ready {
err = fmt.Errorf("the UFS is not ready")
return unmountedPaths, err
}

var alluxioPaths []string
// Check if any of the Mounts has not been mounted in Alluxio
for _, mount := range dataset.Spec.Mounts {
if common.IsFluidNativeScheme(mount.MountPoint) {
// No need for a mount point with Fluid native scheme('local://' and 'pvc://') to be mounted
continue
}
alluxioPath := utils.UFSPathBuilder{}.GenAlluxioMountPath(mount, dataset.Spec.Mounts)
alluxioPaths = append(alluxioPaths, alluxioPath)
}

// For fluid native schema, skip mount check to avoid unnecessary system call
if len(alluxioPaths) == 0{
return
}

return fileUtils.FindUnmountedAlluxioPaths(alluxioPaths)
}

func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err error) {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
Expand All @@ -113,6 +153,7 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err
return fmt.Errorf("the UFS is not ready, namespace:%s,name:%s", e.namespace, e.name)
}

everMounted := false
// Iterate all the mount points, do mount if the mount point is in added array
// TODO: not allow to edit FluidNativeScheme MountPoint
for _, mount := range dataset.Spec.Mounts {
Expand Down Expand Up @@ -154,6 +195,8 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err
if err != nil {
return err
}

everMounted = true
}
}

Expand Down Expand Up @@ -183,6 +226,10 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err
return nil
}

if everMounted {
e.updateMountTime()
}

return nil
}

Expand All @@ -201,6 +248,7 @@ func (e *AlluxioEngine) mountUFS() (err error) {
return fmt.Errorf("the UFS is not ready")
}

everMounted := false
// Iterate all the mount points, do mount if the mount point is not Fluid-native(e.g. Hostpath or PVC)
for _, mount := range dataset.Spec.Mounts {
mount := mount
Expand All @@ -226,9 +274,15 @@ func (e *AlluxioEngine) mountUFS() (err error) {
if err != nil {
return err
}

everMounted = true
}
}

if everMounted {
e.updateMountTime()
}

return nil
}

Expand Down Expand Up @@ -260,3 +314,27 @@ func (e *AlluxioEngine) genUFSMountOptions(m datav1alpha1.Mount) (map[string]str

return mOptions, nil
}

func (e *AlluxioEngine) updateMountTime() {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
runtime, err := e.getRuntime()
if err != nil {
return err
}

runtimeToUpdate := runtime.DeepCopy()
runtimeToUpdate.Status.MountTime.Time = time.Now()

if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), runtimeToUpdate)
} else {
e.Log.Info("Do nothing because the runtime status is not changed.")
}

return err
})

if err != nil {
e.Log.Error(err, "UpdateMountTime", "", e.name)
}
}
Loading

0 comments on commit d25d80b

Please sign in to comment.