diff --git a/charts/fluid/fluid/templates/csi/daemonset.yaml b/charts/fluid/fluid/templates/csi/daemonset.yaml index ae594553611..a8d455f1324 100644 --- a/charts/fluid/fluid/templates/csi/daemonset.yaml +++ b/charts/fluid/fluid/templates/csi/daemonset.yaml @@ -84,6 +84,8 @@ spec: - name: MOUNT_ROOT value: {{ .Values.runtime.mountRoot | quote }} {{- end }} + - name: ALLOW_PATCH_STALE_NODE + value: "true" - name: KUBELET_ROOTDIR value: {{ .Values.csi.kubelet.rootDir }} - name: CSI_ENDPOINT diff --git a/pkg/csi/plugins/nodeserver.go b/pkg/csi/plugins/nodeserver.go index 534311a4034..a8d2eab4422 100644 --- a/pkg/csi/plugins/nodeserver.go +++ b/pkg/csi/plugins/nodeserver.go @@ -24,6 +24,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" "os" "os/exec" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,12 +41,17 @@ import ( "k8s.io/utils/mount" ) +const ( + AllowPatchStaleNodeEnv = "ALLOW_PATCH_STALE_NODE" +) + type nodeServer struct { nodeId string *csicommon.DefaultNodeServer client client.Client apiReader client.Reader mutex sync.Mutex + node *v1.Node } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { @@ -234,7 +240,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag var labelsToModify common.LabelsToModify labelsToModify.Delete(fuseLabelKey) - node, err := kubeclient.GetNode(ns.apiReader, ns.nodeId) + node, err := ns.getNode() if err != nil { glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err) return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId) @@ -266,7 +272,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol var labelsToModify common.LabelsToModify labelsToModify.Add(fuseLabelKey, "true") - node, err := kubeclient.GetNode(ns.apiReader, ns.nodeId) + node, err := ns.getNode() if err != nil { glog.Errorf("NodeStageVolume: can't get node %s: %v", ns.nodeId, err) return nil, errors.Wrapf(err, "NodeStageVolume: can't get node %s", ns.nodeId) @@ -313,14 +319,34 @@ func (ns *nodeServer) getRuntimeNamespacedName(volumeContext map[string]string, runtimeName, nameFound := volumeContext[common.VolumeAttrName] runtimeNamespace, nsFound := volumeContext[common.VolumeAttrNamespace] if nameFound && nsFound { + glog.V(3).Infof("Get runtime namespace(%s) and name(%s) from volume context", runtimeNamespace, runtimeName) return runtimeNamespace, runtimeName, nil } } // Fallback: query API Server to get namespaced name + glog.Infof("Get runtime namespace and name directly from api server with volumeId %s", volumeId) return volume.GetNamespacedNameByVolumeId(ns.apiReader, volumeId) } +// getNode first checks cached node +func (ns *nodeServer) getNode() (node *v1.Node, err error) { + // Default to allow patch stale node info + if envVar, found := os.LookupEnv(AllowPatchStaleNodeEnv); !found || "true" == envVar { + if ns.node != nil { + glog.V(3).Infof("Found cached node %s", ns.node.Name) + return ns.node, nil + } + } + + if node, err = kubeclient.GetNode(ns.apiReader, ns.nodeId); err != nil { + return nil, err + } + glog.V(1).Infof("Got node %s from api server", node.Name) + ns.node = node + return ns.node, nil +} + func checkMountInUse(volumeName string) (bool, error) { var inUse bool glog.Infof("Try to check if the volume %s is being used", volumeName) diff --git a/pkg/ddc/goosefs/transform_permission.go b/pkg/ddc/goosefs/transform_permission.go index 088353fc819..93f9bad13cf 100644 --- a/pkg/ddc/goosefs/transform_permission.go +++ b/pkg/ddc/goosefs/transform_permission.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Fluid Authors. +Copyright 2022 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.