Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CSI start AlluxioFuse process in separate pod #15221

Merged
merged 18 commits into from
Apr 12, 2022
Merged
16 changes: 11 additions & 5 deletions docs/en/deploy/Running-Alluxio-On-Kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1473,12 +1473,12 @@ Here are some common properties that you can customize:
<td>The path in Alluxio which will be mounted</td>
</tr>
<tr>
<td>mountPath</td>
<td>The path that Alluxio will be mounted to in the application container</td>
<td>mountInPod</td>
<td>Set to true to launch Fuse process in an alluxio-fuse pod. Otherwise in the same container as nodeserver</td>
</tr>
<tr>
<td>javaOptions</td>
<td>The customized options which will be passes to fuse daemon</td>
<td>mountPath</td>
<td>The path that Alluxio will be mounted to in the application container</td>
</tr>
<tr>
<td>mountOptions</td>
Expand All @@ -1496,11 +1496,12 @@ Modify or add any configuration properties as required, then create the respecti
$ mv alluxio-csi-controller-rbac.yaml.template alluxio-csi-controller-rbac.yaml
$ mv alluxio-csi-controller.yaml.template alluxio-csi-controller.yaml
$ mv alluxio-csi-driver.yaml.template alluxio-csi-driver.yaml
$ mv alluxio-csi-fuse-configmap.yaml.template alluxio-csi-fuse-configmap.yaml
$ mv alluxio-csi-nodeplugin.yaml.template alluxio-csi-nodeplugin.yaml
```
Then run
```console
$ kubectl apply -f alluxio-csi-controller-rbac.yaml -f alluxio-csi-controller.yaml -f alluxio-csi-driver.yaml -f alluxio-csi-nodeplugin.yaml
$ kubectl apply -f alluxio-csi-controller-rbac.yaml -f alluxio-csi-controller.yaml -f alluxio-csi-driver.yaml -f alluxio-csi-fuse-configmap.yaml -f alluxio-csi-nodeplugin.yaml
```
to deploy CSI-related services.

Expand All @@ -1527,6 +1528,11 @@ $ kubectl apply -f alluxio-pvc-static.yaml
```
to deploy the resources.

Note: If `mountInPod` is set to `true`, in `alluxio-pv.yaml`, the value of `spec.csi.volumeHandle`
needs to be unique for CSI to identify different volumes. If the values of `volumeHundle` of two
PVs are the same, CSI would regard them as the same volume, and thus may not launch Fuse pod,
affecting the business pods.

{% endnavtab %}
{% navtab Dynamic Volume Provisioning %}

Expand Down
5 changes: 4 additions & 1 deletion integration/docker/csi/alluxio/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type controllerServer struct {
*csicommon.DefaultControllerServer
}

/*
* If dynamic provisioning, CreateVolume() is called when the pvc is created and matches one of the storageclass.
*/

func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
volumeID := sanitizeVolumeID(req.GetName())

Expand Down Expand Up @@ -122,7 +126,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
glog.V(4).Infof("Deleting volume %s", volumeID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do anything here. The log is misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the returned value not trigger a deletion? If so why is the return type not a CreateVolumeResponse?
return &csi.DeleteVolumeResponse{}, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the returned value will trigger the deletion of the pv. However that is not happening inside this function, so logging should also not be here. Plus we are not removing any data stored in Alluxio


return &csi.DeleteVolumeResponse{}, nil
}
Expand Down
11 changes: 8 additions & 3 deletions integration/docker/csi/alluxio/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
"k8s.io/client-go/kubernetes"
)

const (
Expand All @@ -23,11 +24,13 @@ const (
)

type driver struct {
csiDriver *csicommon.CSIDriver
nodeId, endpoint string
csiDriver *csicommon.CSIDriver
endpoint string
client kubernetes.Clientset
nodeId string
}

func NewDriver(nodeID, endpoint string) *driver {
func NewDriver(nodeID, endpoint string, client kubernetes.Clientset) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)
csiDriver := csicommon.NewCSIDriver(driverName, version, nodeID)
csiDriver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
Expand All @@ -37,6 +40,7 @@ func NewDriver(nodeID, endpoint string) *driver {
nodeId: nodeID,
endpoint: endpoint,
csiDriver: csiDriver,
client: client,
}
}

Expand All @@ -49,6 +53,7 @@ func (d *driver) newNodeServer() *nodeServer {
return &nodeServer{
nodeId: d.nodeId,
DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver),
client: d.client,
}
}

Expand Down
225 changes: 210 additions & 15 deletions integration/docker/csi/alluxio/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,61 @@
package alluxio

import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
mount "k8s.io/mount-utils"
)

type nodeServer struct {
nodeId string
client kubernetes.Clientset
*csicommon.DefaultNodeServer
nodeId string
mutex sync.Mutex
}

/*
* When there is no app pod using the pv, the first app pod using the pv would trigger NodeStageVolume().
* Only after a successful return, NodePublishVolume() is called.
* When a pv is already in use and a new app pod uses it as its volume, it would only trigger NodePublishVolume()
*
* NodeUnpublishVolume() and NodeUnstageVolume() are the opposites of NodePublishVolume() and NodeStageVolume()
* When a pv would still be using by other pods after an app pod terminated, only NodeUnpublishVolume() is called.
* When a pv would not be in use after an app pod terminated, NodeUnpublishVolume() is called. Only after a successful
* return, NodeUnstageVolume() is called.
*
* For more detailed CSI doc, refer to https://github.com/container-storage-interface/spec/blob/master/spec.md
*/

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if req.GetVolumeContext()["mountInPod"] == "true" {
glog.V(4).Infoln("Bind mount staging path (global mount point) to target path (pod volume path).")
return bindMountGlobalMountPointToPodVolPath(req)
}
glog.V(4).Infoln("Mount Alluxio to target path (pod volume path) with AlluxioFuse in CSI node server.")
return newFuseProcessInNodeServer(req)
}

func newFuseProcessInNodeServer(req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
targetPath := req.GetTargetPath()

notMnt, err := ns.ensureMountPoint(targetPath)
notMnt, err := ensureMountPoint(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -87,45 +120,188 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
func bindMountGlobalMountPointToPodVolPath(req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
stagingPath := req.GetStagingTargetPath()

command := exec.Command("/opt/alluxio/integration/fuse/bin/alluxio-fuse",
"unmount", targetPath,
)
notMnt, err := ensureMountPoint(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if !notMnt {
glog.V(4).Infoln("target path is already mounted")
return &csi.NodePublishVolumeResponse{}, nil
}

args := []string{"--bind", stagingPath, targetPath}
command := exec.Command("mount", args...)
glog.V(4).Infoln(command)
stdoutStderr, err := command.CombinedOutput()
glog.V(4).Infoln(string(stdoutStderr))
if err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if strings.Contains(err.Error(), "invalid argument") {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
command := exec.Command("/opt/alluxio/integration/fuse/bin/alluxio-fuse", "umount", targetPath)
glog.V(4).Infoln(command)
stdoutStderr, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(err)
}
glog.V(4).Infoln(string(stdoutStderr))

err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
err = mount.CleanupMountPoint(targetPath, mount.New(""), false)
if err != nil {
glog.V(3).Infoln(err)
} else {
glog.V(4).Infof("Succeed in unmounting %s", targetPath)
}

return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return &csi.NodeUnstageVolumeResponse{}, nil
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if req.GetVolumeContext()["mountInPod"] != "true" {
return &csi.NodeStageVolumeResponse{}, nil
}
ns.mutex.Lock()
defer ns.mutex.Unlock()

glog.V(4).Infoln("Creating Alluxio-fuse pod and mounting Alluxio to global mount point.")
fusePod, err := getAndCompleteFusePodObj(ns.nodeId, req)
if err != nil {
return nil, err
}
if _, err := ns.client.CoreV1().Pods(os.Getenv("NAMESPACE")).Create(fusePod); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the pod already crated by previous request? Make it idempotent?

return nil, status.Errorf(codes.Internal, "Failed to launch Fuse Pod at %v.\n%v", ns.nodeId, err.Error())
}
glog.V(4).Infoln("Successfully creating Fuse pod.")

// Wait for alluxio-fuse pod finishing mount to global mount point
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
Comment on lines +195 to +213
Copy link
Contributor

@Binyang2014 Binyang2014 Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we can leverage Kubernetes retry policy. We can let this method retry error if fuse-daemon not ready. Then k8s will retry automatically. So we don't need to write this logic by our own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be in the next step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean seems we can simply write as flowing:

Suggested change
retry, err := strconv.Atoi(os.Getenv("FAILURE_THRESHOLD"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert failure threshold %v to int.", os.Getenv("FAILURE_THRESHOLD"))
}
timeout, err := strconv.Atoi(os.Getenv("PERIOD_SECONDS"))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Cannot convert period seconds %v to int.", os.Getenv("PERIOD_SECONDS"))
}
for i:= 0; i < retry; i++ {
time.Sleep(time.Duration(timeout) * time.Second)
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio is not mounted in %v seconds.", i * timeout))
}
if len(stdout) > 0 {
return &csi.NodeStageVolumeResponse{}, nil
}
}
command := exec.Command("bash", "-c", fmt.Sprintf("mount | grep %v | grep alluxio-fuse", req.GetStagingTargetPath()))
stdout, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(fmt.Sprintf("Alluxio mount point is not ready"))
return err
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if fuse is not ready we just return the error, and let CSI recall this method again? But the later calls will first find out that the pod already exists and directly returns success and won't check the mount point again.
Am I interpreting it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we'd better use pod readiness probe. If the pod not ready, we return error directly then let CSI recall this method again, if it already ready return succeed. We should not rely on if pod existed to pass the check. Is it make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it makes sense. Just to clarify, here we are checking if Alluxio fuse has mounted Alluxio to mount point, not if the pod exists. I will work on the readiness probe soon.

glog.V(3).Infoln(fmt.Sprintf("Time out. Alluxio-fuse is not mounted to global mount point in %vs.", (retry - 1) * timeout))
return nil, status.Error(codes.DeadlineExceeded, fmt.Sprintf("alluxio-fuse is not mounted to global mount point in %vs", (retry - 1) * timeout))
}

func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return &csi.NodeStageVolumeResponse{}, nil
func getAndCompleteFusePodObj(nodeId string, req *csi.NodeStageVolumeRequest) (*v1.Pod, error) {
csiFusePodObj, err := getFusePodObj()
if err != nil {
return nil, err
}

// Append volumeId to pod name for uniqueness
csiFusePodObj.Name = csiFusePodObj.Name + "-" + req.GetVolumeId()

// Set node name for scheduling
csiFusePodObj.Spec.NodeName = nodeId

// Set Alluxio path to be mounted
targetPath := req.GetVolumeContext()["alluxioPath"]
if targetPath == "" {
targetPath = "/"
}
source := v1.EnvVar{Name: "FUSE_ALLUXIO_PATH", Value: targetPath}
csiFusePodObj.Spec.Containers[0].Env = append(csiFusePodObj.Spec.Containers[0].Env, source)

// Set mount path provided by CSI
mountPoint := v1.EnvVar{Name: "MOUNT_POINT", Value: req.GetStagingTargetPath()}
csiFusePodObj.Spec.Containers[0].Env = append(csiFusePodObj.Spec.Containers[0].Env, mountPoint)

// Set pre-stop command (umount) in pod lifecycle
lifecycle := &v1.Lifecycle {
PreStop: &v1.Handler {
Exec: &v1.ExecAction {
Command: []string{"/opt/alluxio/integration/fuse/bin/alluxio-fuse", "unmount", req.GetStagingTargetPath()},
},
},
}
csiFusePodObj.Spec.Containers[0].Lifecycle = lifecycle

// Set fuse mount options
fuseOptsStr := strings.Join(req.GetVolumeCapability().GetMount().GetMountFlags(), ",")
csiFusePodObj.Spec.Containers[0].Args = append(csiFusePodObj.Spec.Containers[0].Args, "--fuse-opts=" + fuseOptsStr)

// Update ALLUXIO_FUSE_JAVA_OPTS to include csi client java options
alluxioCSIFuseJavaOpts :=
strings.Join([]string{os.Getenv("ALLUXIO_FUSE_JAVA_OPTS"), req.GetVolumeContext()["javaOptions"]}, " ")
alluxioFuseJavaOptsEnv := v1.EnvVar{Name: "ALLUXIO_FUSE_JAVA_OPTS", Value: alluxioCSIFuseJavaOpts}
csiFusePodObj.Spec.Containers[0].Env = append(csiFusePodObj.Spec.Containers[0].Env, alluxioFuseJavaOptsEnv)

return csiFusePodObj, nil
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
podName := "alluxio-fuse-" + req.GetVolumeId()
if err := ns.client.CoreV1().Pods(os.Getenv("NAMESPACE")).Delete(podName, &metav1.DeleteOptions{}); err != nil {
if strings.Contains(err.Error(), "not found") {
// Pod not found. Try to clean up the mount point.
command := exec.Command("umount", req.GetStagingTargetPath())
glog.V(4).Infoln(command)
stdoutStderr, err := command.CombinedOutput()
if err != nil {
glog.V(3).Infoln(err)
}
glog.V(4).Infoln(string(stdoutStderr))
return &csi.NodeUnstageVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, fmt.Sprintf("Error deleting fuse pod %v\n%v", podName, err.Error()))
}
return &csi.NodeUnstageVolumeResponse{}, nil
}

func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (ns *nodeServer) isCorruptedDir(dir string) bool {
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability {
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}, nil
}

func isCorruptedDir(dir string) bool {
pathExists, pathErr := mount.PathExists(dir)
glog.V(3).Infoln("isCorruptedDir(%s) returned with error: (%v, %v)\\n", dir, pathExists, pathErr)
return pathErr != nil && mount.IsCorruptedMnt(pathErr)
}

func (ns *nodeServer) ensureMountPoint(targetPath string) (bool, error) {
func ensureMountPoint(targetPath string) (bool, error) {
mounter := mount.New(targetPath)
notMnt, err := mounter.IsLikelyNotMountPoint(targetPath)

Expand All @@ -138,7 +314,7 @@ func (ns *nodeServer) ensureMountPoint(targetPath string) (bool, error) {
}
return true, nil
}
if ns.isCorruptedDir(targetPath) {
if isCorruptedDir(targetPath) {
glog.V(3).Infoln("detected corrupted mount for targetPath [%s]", targetPath)
if err := mounter.Unmount(targetPath); err != nil {
glog.V(3).Infoln("failed to umount corrupted path [%s]", targetPath)
Expand All @@ -148,3 +324,22 @@ func (ns *nodeServer) ensureMountPoint(targetPath string) (bool, error) {
}
return notMnt, err
}

func getFusePodObj() (*v1.Pod, error) {
csiFuseYaml, err := ioutil.ReadFile("/opt/alluxio/integration/kubernetes/csi/alluxio-csi-fuse.yaml")
if err != nil {
glog.V(3).Info("csi-fuse config yaml file not found")
return nil, status.Errorf(codes.NotFound, "csi-fuse config yaml file not found: %v", err.Error())
}
csiFuseObj, grpVerKind, err := scheme.Codecs.UniversalDeserializer().Decode(csiFuseYaml, nil, nil)
if err != nil {
glog.V(3).Info("Failed to decode csi-fuse config yaml file")
return nil, status.Errorf(codes.Internal, "Failed to decode csi-fuse config yaml file.\n", err.Error())
}
// Only support Fuse Pod
if grpVerKind.Kind != "Pod" {
glog.V(3).Info("csi-fuse only support pod. %v found.")
return nil, status.Errorf(codes.InvalidArgument, "csi-fuse only support Pod. %v found.\n%v", grpVerKind.Kind, err.Error())
}
return csiFuseObj.(*v1.Pod), nil
}
Loading