diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index 0018d753e..746be0066 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -52,6 +52,7 @@ func main() { flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is = where is the value of a 'kind' storage class parameter and is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.") flag.BoolVar(&cfg.EnableAttach, "enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.") flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)") + flag.StringVar(&cfg.DataRoot, "dataRoot", "/csi-data-dir", "the directory where persist volume and snapshot data are stored here") showVersion := flag.Bool("version", false, "Show version.") // The proxy-endpoint option is intended to used by the Kubernetes E2E test suite diff --git a/pkg/hostpath/capacity.go b/pkg/hostpath/capacity.go deleted file mode 100644 index c02ae672e..000000000 --- a/pkg/hostpath/capacity.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostpath - -import ( - "errors" - "flag" - "fmt" - "strings" - - "k8s.io/apimachinery/pkg/api/resource" -) - -// Capacity simulates linear storage of certain types ("fast", -// "slow"). To calculate the amount of allocated space, the size of -// all currently existing volumes of the same kind is summed up. -// -// Available capacity is configurable with a command line flag -// -capacity = where is a string and -// is a quantity (1T, 1Gi). More than one of those -// flags can be used. -// -// The underlying map will be initialized if needed by Set, -// which makes it possible to define and use a Capacity instance -// without explicit initialization (`var capacity Capacity` or as -// member in a struct). -type Capacity map[string]resource.Quantity - -// Set is an implementation of flag.Value.Set. -func (c *Capacity) Set(arg string) error { - parts := strings.SplitN(arg, "=", 2) - if len(parts) != 2 { - return errors.New("must be of format =") - } - quantity, err := resource.ParseQuantity(parts[1]) - if err != nil { - return err - } - - // We overwrite any previous value. - if *c == nil { - *c = Capacity{} - } - (*c)[parts[0]] = quantity - return nil -} - -func (c *Capacity) String() string { - return fmt.Sprintf("%v", map[string]resource.Quantity(*c)) -} - -var _ flag.Value = &Capacity{} - -// Enabled returns true if capacities are configured. -func (c *Capacity) Enabled() bool { - return len(*c) > 0 -} diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index e5c57de5b..d606eb5ec 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -19,12 +19,12 @@ package hostpath import ( "fmt" "math" - "os" "path/filepath" "sort" "strconv" "github.com/golang/protobuf/ptypes" + "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" "github.com/golang/glog" "github.com/golang/protobuf/ptypes/wrappers" @@ -41,13 +41,6 @@ const ( deviceID = "deviceID" ) -type accessType int - -const ( - mountAccess accessType = iota - blockAccess -) - func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) { if err := hp.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("invalid create volume req: %v", req) @@ -84,19 +77,19 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type") } - var requestedAccessType accessType + var requestedAccessType state.AccessType if accessTypeBlock { - requestedAccessType = blockAccess + requestedAccessType = state.BlockAccess } else { // Default to mount. - requestedAccessType = mountAccess + requestedAccessType = state.MountAccess } // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() capacity := int64(req.GetCapacityRange().GetRequiredBytes()) topologies := []*csi.Topology{ @@ -105,7 +98,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque // Need to check for already existing volume name, and if found // check for the requested capacity and already allocated capacity - if exVol, err := hp.getVolumeByName(req.GetName()); err == nil { + if exVol, err := hp.hostPathDriverState.GetVolumeByName(req.GetName()); err == nil { // Since err is nil, it means the volume with the same name already exists // need to check if the size of existing volume is the same as in new // request @@ -140,15 +133,15 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque } volumeID := uuid.NewUUID().String() - kind := req.GetParameters()[storageKind] - vol, err := hp.createVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */, kind) + kind := req.GetParameters()[state.StorageKind] + vol, err := hp.hostPathDriverState.CreateVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */, kind, hp.config.MaxVolumeSize, hp.config.Capacity) if err != nil { return nil, fmt.Errorf("failed to create volume %v: %w", volumeID, err) } glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath) if req.GetVolumeContentSource() != nil { - path := getVolumePath(volumeID) + path := state.GetVolumePath(volumeID, hp.config.DataRoot) volumeSource := req.VolumeContentSource switch volumeSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: @@ -166,7 +159,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque } if err != nil { glog.V(4).Infof("VolumeSource error: %v", err) - if delErr := hp.deleteVolume(volumeID); delErr != nil { + if delErr := hp.hostPathDriverState.DeleteVolume(volumeID, hp.config.Capacity); delErr != nil { glog.V(2).Infof("deleting hostpath volume %v failed: %v", volumeID, delErr) } return nil, err @@ -198,11 +191,11 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() volId := req.GetVolumeId() - vol, err := hp.getVolumeByID(volId) + vol, err := hp.hostPathDriverState.GetVolumeByID(volId) if err != nil { // Volume not found: might have already deleted return &csi.DeleteVolumeResponse{}, nil @@ -213,7 +206,7 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque vol.VolID, vol.IsAttached, vol.IsStaged, vol.IsPublished, vol.NodeID) } - if err := hp.deleteVolume(volId); err != nil { + if err := hp.hostPathDriverState.DeleteVolume(volId, hp.config.Capacity); err != nil { return nil, fmt.Errorf("failed to delete volume %v: %w", volId, err) } glog.V(4).Infof("volume %v successfully deleted", volId) @@ -239,10 +232,10 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Unlock() - if _, err := hp.getVolumeByID(req.GetVolumeId()); err != nil { + if _, err := hp.hostPathDriverState.GetVolumeByID(req.GetVolumeId()); err != nil { return nil, err } @@ -283,10 +276,10 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.config.NodeID) } - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() - vol, err := hp.getVolumeByID(req.VolumeId) + vol, err := hp.hostPathDriverState.GetVolumeByID(req.VolumeId) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } @@ -305,7 +298,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro vol.IsAttached = true vol.ReadOnlyAttach = req.GetReadonly() - if err := hp.updateVolume(vol.VolID, vol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(vol.VolID, vol); err != nil { return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err) } @@ -328,10 +321,10 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.config.NodeID) } - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() - vol, err := hp.getVolumeByID(req.VolumeId) + vol, err := hp.hostPathDriverState.GetVolumeByID(req.VolumeId) if err != nil { // Not an error: a non-existent volume is not published. // See also https://github.com/kubernetes-csi/external-attacher/pull/165 @@ -345,7 +338,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont } vol.IsAttached = false - if err := hp.updateVolume(vol.VolID, vol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(vol.VolID, vol); err != nil { return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err) } @@ -355,8 +348,8 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() // Topology and capabilities are irrelevant. We only // distinguish based on the "kind" parameter, if at all. @@ -366,9 +359,9 @@ func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest // Empty "kind" will return "zero capacity". There is no fallback // to some arbitrary kind here because in practice it always should // be set. - kind := req.GetParameters()[storageKind] + kind := req.GetParameters()[state.StorageKind] quantity := hp.config.Capacity[kind] - allocated := hp.sumVolumeSizes(kind) + allocated := hp.hostPathDriverState.SumVolumeSizes(kind) available = quantity.Value() - allocated } maxVolumeSize := hp.config.MaxVolumeSize @@ -393,15 +386,15 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest var ( startIdx, volumesLength, maxLength int64 - hpVolume hostPathVolume + hpVolume state.HostPathVolume ) // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Unlock() - volumeIds := hp.getSortedVolumeIDs() + volumeIds := hp.hostPathDriverState.GetSortedVolumeIDs() if req.StartingToken == "" { req.StartingToken = "1" } @@ -418,8 +411,13 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest maxLength = volumesLength } + volumes, err := hp.hostPathDriverState.ListVolumes() + if err != nil { + return nil, err + } + for index := startIdx - 1; index < volumesLength && index < maxLength; index++ { - hpVolume = hp.volumes[volumeIds[index]] + hpVolume = volumes[volumeIds[index]] healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index]) glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy) volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{ @@ -444,10 +442,10 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Unlock() - volume, err := hp.getVolumeByID(req.GetVolumeId()) + volume, err := hp.hostPathDriverState.GetVolumeByID(req.GetVolumeId()) if err != nil { return nil, err } @@ -471,7 +469,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller // getSnapshotPath returns the full path to where the snapshot is stored func getSnapshotPath(snapshotID string) string { - return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, snapshotExt)) + return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, state.SnapshotExt)) } // CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create @@ -492,12 +490,12 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetSnapshotLocker().Lock() + defer hp.hostPathDriverState.GetSnapshotLocker().Unlock() // Need to check for already existing snapshot name, and if found check for the // requested sourceVolumeId and sourceVolumeId of snapshot that has been created. - if exSnap, err := hp.getSnapshotByName(req.GetName()); err == nil { + if exSnap, err := hp.hostPathDriverState.GetSnapshotByName(req.GetName()); err == nil { // Since err is nil, it means the snapshot with the same name already exists need // to check if the sourceVolumeId of existing snapshot is the same as in new request. if exSnap.VolID == req.GetSourceVolumeId() { @@ -515,8 +513,13 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR return nil, status.Errorf(codes.AlreadyExists, "snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName()) } + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + volumeID := req.GetSourceVolumeId() - hostPathVolume, err := hp.getVolumeByID(volumeID) + hostPathVolume, err := hp.hostPathDriverState.GetVolumeByID(volumeID) if err != nil { return nil, err } @@ -527,7 +530,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR file := getSnapshotPath(snapshotID) var cmd []string - if hostPathVolume.VolAccessType == blockAccess { + if hostPathVolume.VolAccessType == state.BlockAccess { glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume") cmd = []string{"cp", volPath, file} } else { @@ -541,16 +544,10 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR } glog.V(4).Infof("create volume snapshot %s", file) - snapshot := hostPathSnapshot{} - snapshot.Name = req.GetName() - snapshot.Id = snapshotID - snapshot.VolID = volumeID - snapshot.Path = file - snapshot.CreationTime = creationTime - snapshot.SizeBytes = hostPathVolume.VolSize - snapshot.ReadyToUse = true - - hp.snapshots[snapshotID] = snapshot + snapshot, err := hp.hostPathDriverState.CreateSnapshot(req.GetName(), snapshotID, volumeID, file, creationTime, hostPathVolume.VolSize, true) + if err != nil { + return nil, err + } return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ @@ -577,54 +574,60 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetSnapshotLocker().Lock() + defer hp.hostPathDriverState.GetSnapshotLocker().Unlock() glog.V(4).Infof("deleting snapshot %s", snapshotID) - path := getSnapshotPath(snapshotID) - os.RemoveAll(path) - delete(hp.snapshots, snapshotID) + err := hp.hostPathDriverState.DeleteSnapshot(snapshotID) + if err != nil { + return nil, err + } + return &csi.DeleteSnapshotResponse{}, nil } func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.hostPathDriverState.GetSnapshotLocker().RLocker().Unlock() + defer hp.hostPathDriverState.GetSnapshotLocker().RLocker().Unlock() if err := hp.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil { glog.V(3).Infof("invalid list snapshot req: %v", req) return nil, err } - // Lock before acting on global state. A production-quality - // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + snapshots, err := hp.hostPathDriverState.ListSnapshots() + if err != nil { + return nil, err + } // case 1: SnapshotId is not empty, return snapshots that match the snapshot id. if len(req.GetSnapshotId()) != 0 { snapshotID := req.SnapshotId - if snapshot, ok := hp.snapshots[snapshotID]; ok { + if snapshot, ok := snapshots[snapshotID]; ok { return convertSnapshot(snapshot), nil } } // case 2: SourceVolumeId is not empty, return snapshots that match the source volume id. if len(req.GetSourceVolumeId()) != 0 { - for _, snapshot := range hp.snapshots { + for _, snapshot := range snapshots { if snapshot.VolID == req.SourceVolumeId { return convertSnapshot(snapshot), nil } } } - var snapshots []csi.Snapshot + var snapshotList []csi.Snapshot // case 3: no parameter is set, so we return all the snapshots. sortedKeys := make([]string, 0) - for k := range hp.snapshots { + for k := range snapshots { sortedKeys = append(sortedKeys, k) } sort.Strings(sortedKeys) for _, key := range sortedKeys { - snap := hp.snapshots[key] + snap := snapshots[key] snapshot := csi.Snapshot{ SnapshotId: snap.Id, SourceVolumeId: snap.VolID, @@ -632,11 +635,11 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq SizeBytes: snap.SizeBytes, ReadyToUse: snap.ReadyToUse, } - snapshots = append(snapshots, snapshot) + snapshotList = append(snapshotList, snapshot) } var ( - ulenSnapshots = int32(len(snapshots)) + ulenSnapshots = int32(len(snapshotList)) maxEntries = req.MaxEntries startingToken int32 ) @@ -678,7 +681,7 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq for i = 0; i < len(entries); i++ { entries[i] = &csi.ListSnapshotsResponse_Entry{ - Snapshot: &snapshots[j], + Snapshot: &snapshotList[j], } j++ } @@ -713,17 +716,17 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() - exVol, err := hp.getVolumeByID(volID) + exVol, err := hp.hostPathDriverState.GetVolumeByID(volID) if err != nil { return nil, err } if exVol.VolSize < capacity { exVol.VolSize = capacity - if err := hp.updateVolume(volID, exVol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(volID, exVol); err != nil { return nil, fmt.Errorf("could not update volume %s: %w", volID, err) } } @@ -734,7 +737,7 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control }, nil } -func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse { +func convertSnapshot(snap state.HostPathSnapshot) *csi.ListSnapshotsResponse { entries := []*csi.ListSnapshotsResponse_Entry{ { Snapshot: &csi.Snapshot{ diff --git a/pkg/hostpath/healthcheck.go b/pkg/hostpath/healthcheck.go index 67d1c162f..0c6108c9f 100644 --- a/pkg/hostpath/healthcheck.go +++ b/pkg/hostpath/healthcheck.go @@ -139,7 +139,11 @@ func (hp *hostPath) checkPVCapacityValid(volumeHandle string) (bool, error) { return false, fmt.Errorf("failed to get capacity info: %+v", err) } - volumeCapacity := hp.volumes[volumeHandle].VolSize + volume, err := hp.hostPathDriverState.GetVolumeByID(volumeHandle) + if err != nil { + return false, err + } + volumeCapacity := volume.VolSize glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity) return fscapacity >= volumeCapacity, nil } diff --git a/pkg/hostpath/healthcheck_test.go b/pkg/hostpath/healthcheck_test.go index 881d32ea5..280c36288 100644 --- a/pkg/hostpath/healthcheck_test.go +++ b/pkg/hostpath/healthcheck_test.go @@ -346,15 +346,3 @@ func TestParseMountInfo(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, containerFileSystems) } - -func TestFilterVolumeName(t *testing.T) { - targetPath := "/var/lib/kubelet/pods/3440e8ee-10de-11eb-8895-fa163feebd84/volumes/kubernetes.io~csi/pvc-33d023c7-10de-11eb-8895-fa163feebd84/mount" - volumeName := filterVolumeName(targetPath) - assert.Equal(t, "pvc-33d023c7-10de-11eb-8895-fa163feebd84", volumeName) -} - -func TestFilterVolumeID(t *testing.T) { - sourcePath := "/dev/vda2[/var/lib/csi-hostpath-data/39267558-10de-11eb-8fb9-0a58ac120605]" - volumeID := filterVolumeID(sourcePath) - assert.Equal(t, "39267558-10de-11eb-8fb9-0a58ac120605", volumeID) -} diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 4b6f8ba5e..5a56a8846 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -20,74 +20,18 @@ import ( "errors" "fmt" "io" - "io/ioutil" "os" - "os/exec" - "path/filepath" - "regexp" - "sort" - "strings" - "sync" "github.com/golang/glog" - timestamp "github.com/golang/protobuf/ptypes/timestamp" + "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/apimachinery/pkg/api/resource" - fs "k8s.io/kubernetes/pkg/volume/util/fs" - "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" utilexec "k8s.io/utils/exec" ) -const ( - kib int64 = 1024 - mib int64 = kib * 1024 - gib int64 = mib * 1024 - gib100 int64 = gib * 100 - tib int64 = gib * 1024 - tib100 int64 = tib * 100 - - // storageKind is the special parameter which requests - // storage of a certain kind (only affects capacity checks). - storageKind = "kind" -) - type hostPath struct { - config Config - - // gRPC calls involving any of the fields below must be serialized - // by locking this mutex before starting. Internal helper - // functions assume that the mutex has been locked. - mutex sync.Mutex - volumes map[string]hostPathVolume - snapshots map[string]hostPathSnapshot -} - -type hostPathVolume struct { - VolName string `json:"volName"` - VolID string `json:"volID"` - VolSize int64 `json:"volSize"` - VolPath string `json:"volPath"` - VolAccessType accessType `json:"volAccessType"` - ParentVolID string `json:"parentVolID,omitempty"` - ParentSnapID string `json:"parentSnapID,omitempty"` - Ephemeral bool `json:"ephemeral"` - NodeID string `json:"nodeID"` - Kind string `json:"kind"` - ReadOnlyAttach bool `json:"readOnlyAttach"` - IsAttached bool `json:"isAttached"` - IsStaged bool `json:"isStaged"` - IsPublished bool `json:"isPublished"` -} - -type hostPathSnapshot struct { - Name string `json:"name"` - Id string `json:"id"` - VolID string `json:"volID"` - Path string `json:"path"` - CreationTime *timestamp.Timestamp `json:"creationTime"` - SizeBytes int64 `json:"sizeBytes"` - ReadyToUse bool `json:"readyToUse"` + config Config + hostPathDriverState state.DriverState } type Config struct { @@ -98,10 +42,11 @@ type Config struct { VendorVersion string MaxVolumesPerNode int64 MaxVolumeSize int64 - Capacity Capacity + Capacity state.Capacity Ephemeral bool ShowVersion bool EnableAttach bool + DataRoot string } var ( @@ -113,9 +58,6 @@ const ( // This can be ephemeral within the container or persisted if // backed by a Pod volume. dataRoot = "/csi-data-dir" - - // Extension with which snapshot files will be saved. - snapshotExt = ".snap" ) func NewHostPathDriver(cfg Config) (*hostPath, error) { @@ -138,93 +80,18 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) { glog.Infof("Driver: %v ", cfg.DriverName) glog.Infof("Version: %s", cfg.VendorVersion) - hp := &hostPath{ - config: cfg, - volumes: map[string]hostPathVolume{}, - snapshots: map[string]hostPathSnapshot{}, - } - if err := hp.discoveryExistingVolumes(); err != nil { - return nil, err - } - hp.discoverExistingSnapshots() - return hp, nil -} - -func getSnapshotID(file string) (bool, string) { - glog.V(4).Infof("file: %s", file) - // Files with .snap extension are volumesnapshot files. - // e.g. foo.snap, foo.bar.snap - if filepath.Ext(file) == snapshotExt { - return true, strings.TrimSuffix(file, snapshotExt) - } - return false, "" -} - -func (h *hostPath) discoverExistingSnapshots() { - glog.V(4).Infof("discovering existing snapshots in %s", dataRoot) - files, err := ioutil.ReadDir(dataRoot) - if err != nil { - glog.Errorf("failed to discover snapshots under %s: %v", dataRoot, err) - } - for _, file := range files { - isSnapshot, snapshotID := getSnapshotID(file.Name()) - if isSnapshot { - glog.V(4).Infof("adding snapshot %s from file %s", snapshotID, getSnapshotPath(snapshotID)) - h.snapshots[snapshotID] = hostPathSnapshot{ - Id: snapshotID, - Path: getSnapshotPath(snapshotID), - ReadyToUse: true, - } - } - } -} - -func (hp *hostPath) discoveryExistingVolumes() error { - cmdPath, err := exec.LookPath("findmnt") - if err != nil { - return fmt.Errorf("findmnt not found: %w", err) - } - - out, err := exec.Command(cmdPath, "--json").CombinedOutput() - if err != nil { - glog.V(3).Infof("failed to execute command: %+v", cmdPath) - return err - } - - if len(out) < 1 { - return fmt.Errorf("mount point info is nil") - } - - mountInfos, err := parseMountInfo([]byte(out)) + // build up volumes and snapshot data in memory + hostPathDriverState, err := state.LoadStateFromFile(cfg.DataRoot) if err != nil { - return fmt.Errorf("failed to parse the mount infos: %+v", err) - } - - mountInfosOfPod := MountPointInfo{} - for _, mountInfo := range mountInfos { - if mountInfo.Target == podVolumeTargetPath { - mountInfosOfPod = mountInfo - break - } + return nil, err } - // getting existing volumes based on the mount point infos. - // It's a temporary solution to recall volumes. - // TODO: discover what kind of storage was used and the nominal size. - for _, pv := range mountInfosOfPod.ContainerFileSystem { - if !strings.Contains(pv.Target, csiSignOfVolumeTargetPath) { - continue - } - - hpv, err := parseVolumeInfo(pv) - if err != nil { - return err - } - hp.volumes[hpv.VolID] = *hpv + hp := &hostPath{ + config: cfg, + hostPathDriverState: hostPathDriverState, } - glog.V(4).Infof("Existing Volumes: %+v", hp.volumes) - return nil + return hp, nil } func (hp *hostPath) Run() error { @@ -236,171 +103,6 @@ func (hp *hostPath) Run() error { return nil } -func (hp *hostPath) getVolumeByID(volumeID string) (hostPathVolume, error) { - if hostPathVol, ok := hp.volumes[volumeID]; ok { - return hostPathVol, nil - } - return hostPathVolume{}, status.Errorf(codes.NotFound, "volume id %s does not exist in the volumes list", volumeID) -} - -func (hp *hostPath) getVolumeByName(volName string) (hostPathVolume, error) { - for _, hostPathVol := range hp.volumes { - if hostPathVol.VolName == volName { - return hostPathVol, nil - } - } - return hostPathVolume{}, status.Errorf(codes.NotFound, "volume name %s does not exist in the volumes list", volName) -} - -func (hp *hostPath) getSnapshotByName(name string) (hostPathSnapshot, error) { - for _, snapshot := range hp.snapshots { - if snapshot.Name == name { - return snapshot, nil - } - } - return hostPathSnapshot{}, status.Errorf(codes.NotFound, "snapshot name %s does not exist in the snapshots list", name) -} - -// getVolumePath returns the canonical path for hostpath volume -func getVolumePath(volID string) string { - return filepath.Join(dataRoot, volID) -} - -// createVolume allocates capacity, creates the directory for the hostpath volume, and -// adds the volume to the list. -// -// It returns the volume path or err if one occurs. That error is suitable as result of a gRPC call. -func (hp *hostPath) createVolume(volID, name string, cap int64, volAccessType accessType, ephemeral bool, kind string) (hpv *hostPathVolume, finalErr error) { - // Check for maximum available capacity - if cap > hp.config.MaxVolumeSize { - return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", cap, hp.config.MaxVolumeSize) - } - if hp.config.Capacity.Enabled() { - if kind == "" { - // Pick some kind with sufficient remaining capacity. - for k, c := range hp.config.Capacity { - if hp.sumVolumeSizes(k)+cap <= c.Value() { - kind = k - break - } - } - } - if kind == "" { - // Still nothing?! - return nil, status.Errorf(codes.OutOfRange, "requested capacity %d of arbitrary storage exceeds all remaining capacity", cap) - } - used := hp.sumVolumeSizes(kind) - available := hp.config.Capacity[kind] - if used+cap > available.Value() { - - return nil, status.Errorf(codes.OutOfRange, "requested capacity %d exceeds remaining capacity for %q, %s out of %s already used", - cap, kind, resource.NewQuantity(used, resource.BinarySI).String(), available.String()) - } - } else if kind != "" { - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("capacity tracking disabled, specifying kind %q is invalid", kind)) - } - - path := getVolumePath(volID) - - switch volAccessType { - case mountAccess: - err := os.MkdirAll(path, 0777) - if err != nil { - return nil, err - } - case blockAccess: - executor := utilexec.New() - size := fmt.Sprintf("%dM", cap/mib) - // Create a block file. - _, err := os.Stat(path) - if err != nil { - if os.IsNotExist(err) { - out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput() - if err != nil { - return nil, fmt.Errorf("failed to create block device: %v, %v", err, string(out)) - } - } else { - return nil, fmt.Errorf("failed to stat block device: %v, %v", path, err) - } - } - - // Associate block file with the loop device. - volPathHandler := volumepathhandler.VolumePathHandler{} - _, err = volPathHandler.AttachFileDevice(path) - if err != nil { - // Remove the block file because it'll no longer be used again. - if err2 := os.Remove(path); err2 != nil { - glog.Errorf("failed to cleanup block file %s: %v", path, err2) - } - return nil, fmt.Errorf("failed to attach device %v: %v", path, err) - } - default: - return nil, fmt.Errorf("unsupported access type %v", volAccessType) - } - - hostpathVol := hostPathVolume{ - VolID: volID, - VolName: name, - VolSize: cap, - VolPath: path, - VolAccessType: volAccessType, - Ephemeral: ephemeral, - Kind: kind, - } - glog.V(4).Infof("adding hostpath volume: %s = %+v", volID, hostpathVol) - hp.volumes[volID] = hostpathVol - return &hostpathVol, nil -} - -// updateVolume updates the existing hostpath volume. -func (hp *hostPath) updateVolume(volID string, volume hostPathVolume) error { - glog.V(4).Infof("updating hostpath volume: %s", volID) - - if _, err := hp.getVolumeByID(volID); err != nil { - return err - } - - hp.volumes[volID] = volume - return nil -} - -// deleteVolume deletes the directory for the hostpath volume. -func (hp *hostPath) deleteVolume(volID string) error { - glog.V(4).Infof("starting to delete hostpath volume: %s", volID) - - vol, err := hp.getVolumeByID(volID) - if err != nil { - // Return OK if the volume is not found. - return nil - } - - if vol.VolAccessType == blockAccess { - volPathHandler := volumepathhandler.VolumePathHandler{} - path := getVolumePath(volID) - glog.V(4).Infof("deleting loop device for file %s if it exists", path) - if err := volPathHandler.DetachFileDevice(path); err != nil { - return fmt.Errorf("failed to remove loop device for file %s: %v", path, err) - } - } - - path := getVolumePath(volID) - if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { - return err - } - delete(hp.volumes, volID) - glog.V(4).Infof("deleted hostpath volume: %s = %+v", volID, vol) - return nil -} - -func (hp *hostPath) sumVolumeSizes(kind string) (sum int64) { - for _, volume := range hp.volumes { - if volume.Kind == kind { - sum += volume.VolSize - } - } - return -} - // hostPathIsEmpty is a simple check to determine if the specified hostpath directory // is empty or not. func hostPathIsEmpty(p string) (bool, error) { @@ -418,11 +120,12 @@ func hostPathIsEmpty(p string) (bool, error) { } // loadFromSnapshot populates the given destPath with data from the snapshotID -func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mode accessType) error { - snapshot, ok := hp.snapshots[snapshotId] - if !ok { - return status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId) +func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mode state.AccessType) error { + snapshot, err := hp.hostPathDriverState.GetSnapshotByID(snapshotId) + if err != nil { + return err } + if snapshot.ReadyToUse != true { return fmt.Errorf("snapshot %v is not yet ready to use", snapshotId) } @@ -433,12 +136,12 @@ func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mo var cmd []string switch mode { - case mountAccess: + case state.MountAccess: cmd = []string{"tar", "zxvf", snapshotPath, "-C", destPath} - case blockAccess: + case state.BlockAccess: cmd = []string{"dd", "if=" + snapshotPath, "of=" + destPath} default: - return status.Errorf(codes.InvalidArgument, "unknown accessType: %d", mode) + return status.Errorf(codes.InvalidArgument, "unknown state.AccessType: %d", mode) } executor := utilexec.New() @@ -452,29 +155,30 @@ func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mo } // loadFromVolume populates the given destPath with data from the srcVolumeID -func (hp *hostPath) loadFromVolume(size int64, srcVolumeId, destPath string, mode accessType) error { - hostPathVolume, ok := hp.volumes[srcVolumeId] - if !ok { - return status.Error(codes.NotFound, "source volumeId does not exist, are source/destination in the same storage class?") +func (hp *hostPath) loadFromVolume(size int64, srcVolumeId, destPath string, mode state.AccessType) error { + hostPathVolume, err := hp.hostPathDriverState.GetVolumeByID(srcVolumeId) + if err != nil { + return err } if hostPathVolume.VolSize > size { return status.Errorf(codes.InvalidArgument, "volume %v size %v is greater than requested volume size %v", srcVolumeId, hostPathVolume.VolSize, size) } + if mode != hostPathVolume.VolAccessType { return status.Errorf(codes.InvalidArgument, "volume %v mode is not compatible with requested mode", srcVolumeId) } switch mode { - case mountAccess: + case state.MountAccess: return loadFromFilesystemVolume(hostPathVolume, destPath) - case blockAccess: + case state.BlockAccess: return loadFromBlockVolume(hostPathVolume, destPath) default: - return status.Errorf(codes.InvalidArgument, "unknown accessType: %d", mode) + return status.Errorf(codes.InvalidArgument, "unknown state.AccessType: %d", mode) } } -func loadFromFilesystemVolume(hostPathVolume hostPathVolume, destPath string) error { +func loadFromFilesystemVolume(hostPathVolume state.HostPathVolume, destPath string) error { srcPath := hostPathVolume.VolPath isEmpty, err := hostPathIsEmpty(srcPath) if err != nil { @@ -493,7 +197,7 @@ func loadFromFilesystemVolume(hostPathVolume hostPathVolume, destPath string) er return nil } -func loadFromBlockVolume(hostPathVolume hostPathVolume, destPath string) error { +func loadFromBlockVolume(hostPathVolume state.HostPathVolume, destPath string) error { srcPath := hostPathVolume.VolPath args := []string{"if=" + srcPath, "of=" + destPath} executor := utilexec.New() @@ -503,54 +207,3 @@ func loadFromBlockVolume(hostPathVolume hostPathVolume, destPath string) error { } return nil } - -func (hp *hostPath) getSortedVolumeIDs() []string { - ids := make([]string, len(hp.volumes)) - index := 0 - for volId := range hp.volumes { - ids[index] = volId - index += 1 - } - - sort.Strings(ids) - return ids -} - -func filterVolumeName(targetPath string) string { - pathItems := strings.Split(targetPath, "kubernetes.io~csi/") - if len(pathItems) < 2 { - return "" - } - - return strings.TrimSuffix(pathItems[1], "/mount") -} - -func filterVolumeID(sourcePath string) string { - volumeSourcePathRegex := regexp.MustCompile(`\[(.*)\]`) - volumeSP := string(volumeSourcePathRegex.Find([]byte(sourcePath))) - if volumeSP == "" { - return "" - } - - return strings.TrimSuffix(strings.TrimPrefix(volumeSP, "[/var/lib/csi-hostpath-data/"), "]") -} - -func parseVolumeInfo(volume MountPointInfo) (*hostPathVolume, error) { - volumeName := filterVolumeName(volume.Target) - volumeID := filterVolumeID(volume.Source) - sourcePath := getSourcePath(volumeID) - _, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath) - if err != nil { - return nil, fmt.Errorf("failed to get capacity info: %+v", err) - } - - hp := hostPathVolume{ - VolName: volumeName, - VolID: volumeID, - VolSize: fscapacity, - VolPath: getVolumePath(volumeID), - VolAccessType: mountAccess, - } - - return &hp, nil -} diff --git a/pkg/hostpath/hostpath_test.go b/pkg/hostpath/hostpath_test.go deleted file mode 100644 index e1aa1e46c..000000000 --- a/pkg/hostpath/hostpath_test.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hostpath - -import ( - "testing" -) - -func TestGetSnapshotID(t *testing.T) { - testCases := []struct { - name string - inputPath string - expectedIsSnapshot bool - expectedSnapshotID string - }{ - { - name: "should recognize foo.snap as a valid snapshot with ID foo", - inputPath: "foo.snap", - expectedIsSnapshot: true, - expectedSnapshotID: "foo", - }, - { - name: "should recognize baz.tar.gz as an invalid snapshot", - inputPath: "baz.tar.gz", - expectedIsSnapshot: false, - expectedSnapshotID: "", - }, - { - name: "should recognize baz.tar.snap as a valid snapshot with ID baz.tar", - inputPath: "baz.tar.snap", - expectedIsSnapshot: true, - expectedSnapshotID: "baz.tar", - }, - { - name: "should recognize baz.tar.snap.snap as a valid snapshot with ID baz.tar.snap", - inputPath: "baz.tar.snap.snap", - expectedIsSnapshot: true, - expectedSnapshotID: "baz.tar.snap", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - actualIsSnapshot, actualSnapshotID := getSnapshotID(tc.inputPath) - if actualIsSnapshot != tc.expectedIsSnapshot { - t.Errorf("unexpected result for path %s, Want: %t, Got: %t", tc.inputPath, tc.expectedIsSnapshot, actualIsSnapshot) - } - if actualSnapshotID != tc.expectedSnapshotID { - t.Errorf("unexpected snapshotID for path %s, Want: %s; Got :%s", tc.inputPath, tc.expectedSnapshotID, actualSnapshotID) - } - }) - } -} diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index 61115e109..461a55c6f 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -23,6 +23,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" + "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,17 +57,17 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() // if ephemeral is specified, create volume here to avoid errors if ephemeralVolume { volID := req.GetVolumeId() volName := fmt.Sprintf("ephemeral-%s", volID) - kind := req.GetVolumeContext()[storageKind] + kind := req.GetVolumeContext()[state.StorageKind] // Configurable size would be nice. For now we use a small, fixed volume size of 100Mi. volSize := int64(100 * 1024 * 1024) - vol, err := hp.createVolume(req.GetVolumeId(), volName, volSize, mountAccess, ephemeralVolume, kind) + vol, err := hp.hostPathDriverState.CreateVolume(req.GetVolumeId(), volName, volSize, state.MountAccess, ephemeralVolume, kind, hp.config.MaxVolumeSize, hp.config.Capacity) if err != nil && !os.IsExist(err) { glog.Error("ephemeral mode failed to create volume: ", err) return nil, err @@ -74,7 +75,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV glog.V(4).Infof("ephemeral mode: created volume: %s", vol.VolPath) } - vol, err := hp.getVolumeByID(req.GetVolumeId()) + vol, err := hp.hostPathDriverState.GetVolumeByID(req.GetVolumeId()) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } @@ -84,7 +85,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV } if req.GetVolumeCapability().GetBlock() != nil { - if vol.VolAccessType != blockAccess { + if vol.VolAccessType != state.BlockAccess { return nil, status.Error(codes.InvalidArgument, "cannot publish a non-block volume as block volume") } @@ -128,7 +129,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV return nil, fmt.Errorf("failed to mount block device: %s at %s: %w", loopDevice, targetPath, err) } } else if req.GetVolumeCapability().GetMount() != nil { - if vol.VolAccessType != mountAccess { + if vol.VolAccessType != state.MountAccess { return nil, status.Error(codes.InvalidArgument, "cannot publish a non-mount volume as mount volume") } @@ -168,7 +169,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV options = append(options, "ro") } mounter := mount.New("") - path := getVolumePath(volumeId) + path := state.GetVolumePath(volumeId, hp.config.DataRoot) if err := mounter.Mount(path, targetPath, "", options); err != nil { var errList strings.Builder @@ -184,7 +185,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV vol.NodeID = hp.config.NodeID vol.IsPublished = true - hp.updateVolume(req.GetVolumeId(), vol) + hp.hostPathDriverState.UpdateVolume(req.GetVolumeId(), vol) return &csi.NodePublishVolumeResponse{}, nil } @@ -202,10 +203,10 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() - vol, err := hp.getVolumeByID(volumeID) + vol, err := hp.hostPathDriverState.GetVolumeByID(volumeID) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } @@ -231,12 +232,12 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl if vol.Ephemeral { glog.V(4).Infof("deleting volume %s", volumeID) - if err := hp.deleteVolume(volumeID); err != nil && !os.IsNotExist(err) { + if err := hp.hostPathDriverState.DeleteVolume(volumeID, hp.config.Capacity); err != nil && !os.IsNotExist(err) { return nil, fmt.Errorf("failed to delete volume: %w", err) } } else { vol.IsPublished = false - if err := hp.updateVolume(vol.VolID, vol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(vol.VolID, vol); err != nil { return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) } } @@ -245,7 +246,6 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl } func (hp *hostPath) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - // Check arguments if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") @@ -257,7 +257,12 @@ func (hp *hostPath) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolum return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request") } - vol, err := hp.getVolumeByID(req.VolumeId) + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() + + vol, err := hp.hostPathDriverState.GetVolumeByID(req.VolumeId) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } @@ -268,7 +273,7 @@ func (hp *hostPath) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolum } vol.IsStaged = true - if err := hp.updateVolume(vol.VolID, vol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(vol.VolID, vol); err != nil { return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) } @@ -285,7 +290,12 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - vol, err := hp.getVolumeByID(req.VolumeId) + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.hostPathDriverState.GetVolumeLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().Unlock() + + vol, err := hp.hostPathDriverState.GetVolumeByID(req.VolumeId) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } @@ -294,7 +304,7 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV return nil, status.Errorf(codes.Internal, "Volume '%s' is still pulished on '%s' node", vol.VolID, vol.NodeID) } vol.IsStaged = false - if err := hp.updateVolume(vol.VolID, vol); err != nil { + if err := hp.hostPathDriverState.UpdateVolume(vol.VolID, vol); err != nil { return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) } @@ -359,15 +369,20 @@ func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolum // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Unlock() + + volumes, err := hp.hostPathDriverState.ListVolumes() + if err != nil { + return nil, err + } - volume, ok := hp.volumes[in.GetVolumeId()] + volume, ok := volumes[in.GetVolumeId()] if !ok { return nil, status.Error(codes.NotFound, "The volume not found") } - _, err := os.Stat(in.GetVolumePath()) + _, err = os.Stat(in.GetVolumePath()) if err != nil { return nil, status.Errorf(codes.NotFound, "Could not get file information from %s: %+v", in.GetVolumePath(), err) } @@ -411,10 +426,10 @@ func (hp *hostPath) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVol // Lock before acting on global state. A production-quality // driver might use more fine-grained locking. - hp.mutex.Lock() - defer hp.mutex.Unlock() + hp.hostPathDriverState.GetVolumeLocker().RLocker().Lock() + defer hp.hostPathDriverState.GetVolumeLocker().RLocker().Unlock() - vol, err := hp.getVolumeByID(volID) + vol, err := hp.hostPathDriverState.GetVolumeByID(volID) if err != nil { // Assume not found error return nil, status.Errorf(codes.NotFound, "Could not get volume %s: %v", volID, err) @@ -432,11 +447,11 @@ func (hp *hostPath) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVol switch m := info.Mode(); { case m.IsDir(): - if vol.VolAccessType != mountAccess { + if vol.VolAccessType != state.MountAccess { return nil, status.Errorf(codes.InvalidArgument, "Volume %s is not a directory", volID) } case m&os.ModeDevice != 0: - if vol.VolAccessType != blockAccess { + if vol.VolAccessType != state.BlockAccess { return nil, status.Errorf(codes.InvalidArgument, "Volume %s is not a block device", volID) } default: diff --git a/pkg/state/state.go b/pkg/state/state.go new file mode 100644 index 000000000..8f0d461ad --- /dev/null +++ b/pkg/state/state.go @@ -0,0 +1,421 @@ +package state + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + "sync" + + "github.com/golang/glog" + "github.com/golang/protobuf/ptypes/timestamp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" + utilexec "k8s.io/utils/exec" +) + +func (hps *HostPathDriverState) ListVolumes() (HostPathVolumes, error) { + return hps.HostPathVolumes, nil +} + +func (hps *HostPathDriverState) GetVolumeByID(volID string) (HostPathVolume, error) { + hpv, ok := hps.HostPathVolumes[volID] + if !ok { + return HostPathVolume{}, status.Errorf(codes.NotFound, "volume id %s does not exist in the volumes list", volID) + + } + + return hpv, nil +} + +func (hps *HostPathDriverState) GetVolumeByName(volName string) (HostPathVolume, error) { + for _, volume := range hps.HostPathVolumes { + if volume.VolName == volName { + return volume, nil + } + } + + return HostPathVolume{}, status.Errorf(codes.NotFound, "volume name %s does not exist in the volumes list", volName) +} + +// CreateVolume allocates capacity, creates the directory for the hostpath volume, and +// adds the volume to the list. +// +// It returns the volume path or err if one occurs. That error is suitable as result of a gRPC call. +func (hps *HostPathDriverState) CreateVolume(volID, name string, cap int64, volAccessType AccessType, ephemeral bool, kind string, maxVolumeSize int64, capacity Capacity) (hpv *HostPathVolume, finalErr error) { + // Check for maximum available capacity + if cap > maxVolumeSize { + return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", cap, maxVolumeSize) + } + if capacity.Enabled() { + if kind == "" { + // Pick some kind with sufficient remaining capacity. + for k, c := range capacity { + if hps.SumVolumeSizes(k)+cap <= c.Value() { + kind = k + break + } + } + } + if kind == "" { + // Still nothing?! + return nil, status.Errorf(codes.OutOfRange, "requested capacity %d of arbitrary storage exceeds all remaining capacity", cap) + } + used := hps.SumVolumeSizes(kind) + available := capacity[kind] + if used+cap > available.Value() { + + return nil, status.Errorf(codes.OutOfRange, "requested capacity %d exceeds remaining capacity for %q, %s out of %s already used", + cap, kind, resource.NewQuantity(used, resource.BinarySI).String(), available.String()) + } + } else if kind != "" { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("capacity tracking disabled, specifying kind %q is invalid", kind)) + } + + path := GetVolumePath(volID, hps.DataRoot) + + switch volAccessType { + case MountAccess: + err := os.MkdirAll(path, 0777) + if err != nil { + return nil, err + } + case BlockAccess: + executor := utilexec.New() + size := fmt.Sprintf("%dM", cap/mib) + // Create a block file. + _, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to create block device: %v, %v", err, string(out)) + } + } else { + return nil, fmt.Errorf("failed to stat block device: %v, %v", path, err) + } + } + + // Associate block file with the loop device. + volPathHandler := volumepathhandler.VolumePathHandler{} + _, err = volPathHandler.AttachFileDevice(path) + if err != nil { + // Remove the block file because it'll no longer be used again. + if err2 := os.Remove(path); err2 != nil { + glog.Errorf("failed to cleanup block file %s: %v", path, err2) + } + return nil, fmt.Errorf("failed to attach device %v: %v", path, err) + } + default: + return nil, fmt.Errorf("unsupported access type %v", volAccessType) + } + + hostpathVol := HostPathVolume{ + VolID: volID, + VolName: name, + VolSize: cap, + VolPath: path, + VolAccessType: volAccessType, + Ephemeral: ephemeral, + Kind: kind, + } + + glog.V(4).Infof("adding hostpath volume: %s = %+v", volID, hostpathVol) + hps.HostPathVolumes[volID] = hostpathVol + if err := hps.flushVolumesToFile(); err != nil { + return nil, fmt.Errorf("failed to store volume data into local file: %s. Error: %s", hps.VolumeDataFilePath, err) + + } + return &hostpathVol, nil +} + +func (hps *HostPathDriverState) UpdateVolume(volID string, volume HostPathVolume) error { + glog.V(4).Infof("updating hostpath volume: %s", volID) + + if _, err := hps.GetVolumeByID(volID); err != nil { + return err + } + + hps.HostPathVolumes[volID] = volume + if err := hps.flushVolumesToFile(); err != nil { + return fmt.Errorf("failed to update volume data into local file: %s. Error: %s", hps.VolumeDataFilePath, err) + + } + + return nil +} + +// deleteVolume deletes the directory for the hostpath volume. +func (hps *HostPathDriverState) DeleteVolume(volID string, capacity Capacity) error { + glog.V(4).Infof("starting to delete hostpath volume: %s", volID) + + vol, err := hps.GetVolumeByID(volID) + if err != nil { + // Return OK if the volume is not found. + return nil + } + + if vol.VolAccessType == BlockAccess { + volPathHandler := volumepathhandler.VolumePathHandler{} + path := GetVolumePath(volID, hps.DataRoot) + glog.V(4).Infof("deleting loop device for file %s if it exists", path) + if err := volPathHandler.DetachFileDevice(path); err != nil { + return fmt.Errorf("failed to remove loop device for file %s: %v", path, err) + } + } + + path := GetVolumePath(volID, hps.DataRoot) + if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { + return err + } + + delete(hps.HostPathVolumes, volID) + if err := hps.flushVolumesToFile(); err != nil { + return fmt.Errorf("failed to update volume data into local file: %s. Error: %s", hps.VolumeDataFilePath, err) + + } + glog.V(4).Infof("deleted hostpath volume: %s = %+v", volID, vol) + return nil +} + +func (hps *HostPathDriverState) GetVolumeLocker() *sync.RWMutex { + return hps.VolumesFileRWLock +} + +func (hps *HostPathDriverState) SumVolumeSizes(kind string) (sum int64) { + for _, volume := range hps.HostPathVolumes { + if volume.Kind == kind { + sum += volume.VolSize + } + } + return +} + +// GetVolumePath returns the canonical path for hostpath volume +func GetVolumePath(volID string, dataRoot string) string { + return filepath.Join(dataRoot, volID) +} + +func (hps *HostPathDriverState) GetSortedVolumeIDs() []string { + ids := make([]string, len(hps.HostPathVolumes)) + index := 0 + for volId := range hps.HostPathVolumes { + ids[index] = volId + index += 1 + } + + sort.Strings(ids) + return ids +} + +func (hps *HostPathDriverState) flushVolumesToFile() error { + err := os.MkdirAll(hps.DataRoot, 0777) + if err != nil { + return err + } + + data, err := json.Marshal(hps.HostPathVolumes) + if err != nil { + glog.Errorf("failed to unmarshal existing volumes: %s", err) + return err + } + + _, err = os.Stat(hps.VolumeDataFilePath) + if err != nil { + _, err = os.Create(hps.VolumeDataFilePath) + if err != nil { + glog.Errorf("failed to create volume data file: %s", err) + a := err.Error() + fmt.Println(a) + return err + } + } + + err = ioutil.WriteFile(hps.VolumeDataFilePath, data, 0644) + if err != nil { + glog.Errorf("failed to discover existing volumes under %s: %v", hps.VolumeDataFilePath, err) + return err + } + + glog.V(4).Info("discover existing volumes successfully") + return nil +} + +func (hps *HostPathDriverState) ListSnapshots() (HostPathSnapshots, error) { + return hps.HostPathSnapshots, nil +} + +func (hps *HostPathDriverState) GetSnapshotByName(name string) (HostPathSnapshot, error) { + for _, snapshot := range hps.HostPathSnapshots { + if snapshot.Name == name { + return snapshot, nil + } + } + return HostPathSnapshot{}, status.Errorf(codes.NotFound, "snapshot name %s does not exist in the snapshots list", name) +} + +func (hps *HostPathDriverState) GetSnapshotByID(id string) (HostPathSnapshot, error) { + snapshot, ok := hps.HostPathSnapshots[id] + if !ok { + return HostPathSnapshot{}, status.Errorf(codes.NotFound, "snapshot id %s does not exist in the snapshots list", id) + } + + return snapshot, nil +} + +func (hps *HostPathDriverState) CreateSnapshot(name, snapshotId, volumeId, snapshotFilePath string, creationTime *timestamp.Timestamp, size int64, readyToUse bool) (HostPathSnapshot, error) { + snapshot := HostPathSnapshot{} + snapshot.Name = name + snapshot.Id = snapshotId + snapshot.VolID = volumeId + snapshot.Path = snapshotFilePath + snapshot.CreationTime = creationTime + snapshot.SizeBytes = size + snapshot.ReadyToUse = true + + hps.HostPathSnapshots[snapshotId] = snapshot + return snapshot, nil +} + +func (hps *HostPathDriverState) DeleteSnapshot(snapshotId string) error { + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hps.SnapshotsFileRWLock.Lock() + defer hps.SnapshotsFileRWLock.Unlock() + + glog.V(4).Infof("deleting snapshot %s", snapshotId) + path := getSnapshotPath(snapshotId, hps.DataRoot) + os.RemoveAll(path) + delete(hps.HostPathSnapshots, snapshotId) + return nil +} + +func (hps *HostPathDriverState) GetSnapshotLocker() *sync.RWMutex { + return hps.SnapshotsFileRWLock +} + +func getSnapshotID(file string) (bool, string) { + glog.V(4).Infof("file: %s", file) + // Files with .snap extension are volumesnapshot files. + // e.g. foo.snap, foo.bar.snap + if filepath.Ext(file) == SnapshotExt { + return true, strings.TrimSuffix(file, SnapshotExt) + } + return false, "" +} + +// getSnapshotPath returns the full path to where the snapshot is stored +func getSnapshotPath(snapshotID, dataRoot string) string { + return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, SnapshotExt)) +} + +func LoadStateFromFile(dataRoot string) (*HostPathDriverState, error) { + driverState := NewHostPathDriverState(dataRoot) + + if err := driverState.loadVolumesFromFile(); err != nil { + return nil, err + } + + if err := driverState.loadSnapshotsFromFile(); err != nil { + return nil, err + } + + return driverState, nil +} + +func (hps *HostPathDriverState) loadVolumesFromFile() error { + hps.VolumesFileRWLock.RLock() + defer hps.VolumesFileRWLock.RUnlock() + glog.V(4).Infof("discovering existing volume data in %s", hps.VolumeDataFilePath) + data, err := ioutil.ReadFile(hps.VolumeDataFilePath) + if err != nil { + glog.Errorf("failed to discover existing volumes under %s: %v", hps.VolumeDataFilePath, err) + return err + } + + volumes := make(HostPathVolumes) + err = json.Unmarshal(data, &volumes) + if err != nil { + glog.Errorf("failed to unmarshal existing volumes: %s", err) + return err + } + + hps.HostPathVolumes = volumes + glog.V(4).Info("discover existing volumes successfully") + return nil +} + +func (hps *HostPathDriverState) loadSnapshotsFromFile() error { + glog.V(4).Infof("discovering existing snapshots in %s", hps.DataRoot) + files, err := ioutil.ReadDir(hps.DataRoot) + if err != nil { + glog.Errorf("failed to discover snapshots under %s: %v", hps.DataRoot, err) + return err + } + + for _, file := range files { + isSnapshot, snapshotID := getSnapshotID(file.Name()) + if isSnapshot { + glog.V(4).Infof("adding snapshot %s from file %s", snapshotID, getSnapshotPath(hps.DataRoot, snapshotID)) + hps.HostPathSnapshots[snapshotID] = HostPathSnapshot{ + Id: snapshotID, + Path: getSnapshotPath(hps.DataRoot, snapshotID), + ReadyToUse: true, + } + } + } + + return nil +} + +// Capacity simulates linear storage of certain types ("fast", +// "slow"). To calculate the amount of allocated space, the size of +// all currently existing volumes of the same kind is summed up. +// +// Available capacity is configurable with a command line flag +// -capacity = where is a string and +// is a quantity (1T, 1Gi). More than one of those +// flags can be used. +// +// The underlying map will be initialized if needed by Set, +// which makes it possible to define and use a Capacity instance +// without explicit initialization (`var capacity Capacity` or as +// member in a struct). +type Capacity map[string]resource.Quantity + +// Set is an implementation of flag.Value.Set. +func (c *Capacity) Set(arg string) error { + parts := strings.SplitN(arg, "=", 2) + if len(parts) != 2 { + return errors.New("must be of format =") + } + quantity, err := resource.ParseQuantity(parts[1]) + if err != nil { + return err + } + + // We overwrite any previous value. + if *c == nil { + *c = Capacity{} + } + (*c)[parts[0]] = quantity + return nil +} + +func (c *Capacity) String() string { + return fmt.Sprintf("%v", map[string]resource.Quantity(*c)) +} + +var _ flag.Value = &Capacity{} + +// Enabled returns true if capacities are configured. +func (c *Capacity) Enabled() bool { + return len(*c) > 0 +} diff --git a/pkg/state/state_test.go b/pkg/state/state_test.go new file mode 100644 index 000000000..9bfe6fa34 --- /dev/null +++ b/pkg/state/state_test.go @@ -0,0 +1,256 @@ +package state + +import ( + "testing" + + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" +) + +const ( + tmpDataRoot = "/tmp/csi-data-dir" +) + +var ( + testHostPathDriverState = NewHostPathDriverState(tmpDataRoot) + testVolumes = make(HostPathVolumes) + testSnapshots = make(HostPathSnapshots) +) + +func TestCreateVolumeByID(t *testing.T) { + maxVolumeSize := 100 * 1024 * 1024 // 10Mi + testVolume := HostPathVolume{ + VolID: "volume-1", + VolName: "volume-1", + VolSize: int64(maxVolumeSize), + VolAccessType: MountAccess, + Ephemeral: false, + Kind: "", + VolPath: "/tmp/csi-data-dir/volume-1", + } + + cap := make(Capacity) + _, err := testHostPathDriverState.CreateVolume( + testVolume.VolID, testVolume.VolName, int64(maxVolumeSize), + testVolume.VolAccessType, false, "", + int64(maxVolumeSize), cap) + + assert.Nil(t, err) + testVolumes[testVolume.VolID] = testVolume + + // Get volume from memory + volumeresult, err := testHostPathDriverState.GetVolumeByID(testVolume.VolID) + assert.Nil(t, err) + assert.EqualValues(t, testVolume, volumeresult) +} + +func TestListVolumes(t *testing.T) { + maxVolumeSize := 100 * 1024 * 1024 * 1024 // 1Gi + testVolume := HostPathVolume{ + VolID: "volume-2", + VolName: "volume-2", + VolSize: int64(maxVolumeSize), + VolAccessType: MountAccess, + Ephemeral: false, + Kind: "", + VolPath: "/tmp/csi-data-dir/volume-2", + } + + cap := make(Capacity) + _, err := testHostPathDriverState.CreateVolume( + testVolume.VolID, testVolume.VolName, int64(maxVolumeSize), + testVolume.VolAccessType, false, "", + int64(maxVolumeSize), cap) + + assert.Nil(t, err) + testVolumes[testVolume.VolID] = testVolume + + volumesResult, err := testHostPathDriverState.ListVolumes() + assert.Nil(t, err) + assert.EqualValues(t, testVolumes, volumesResult) +} + +func TestUpdateVolumes(t *testing.T) { + maxVolumeSize := 100 * 1024 // 1Ki + testVolume := HostPathVolume{ + VolID: "volume-3", + VolName: "volume-3", + VolSize: int64(maxVolumeSize), + VolAccessType: MountAccess, + Ephemeral: false, + Kind: "", + VolPath: "/tmp/csi-data-dir/volume-3", + } + cap := make(Capacity) + _, err := testHostPathDriverState.CreateVolume( + testVolume.VolID, testVolume.VolName, int64(maxVolumeSize), + testVolume.VolAccessType, false, "", + int64(maxVolumeSize), cap) + assert.Nil(t, err) + testVolumes[testVolume.VolID] = testVolume + + testVolume.Ephemeral = true + err = testHostPathDriverState.UpdateVolume(testVolume.VolID, testVolume) + assert.Nil(t, err) + // Get volume from memory + volumeresult, err := testHostPathDriverState.GetVolumeByID(testVolume.VolID) + assert.Nil(t, err) + assert.EqualValues(t, testVolume, volumeresult) +} + +func TestDeleteVolumes(t *testing.T) { + err := testHostPathDriverState.DeleteVolume("volume-3", make(Capacity)) + assert.Nil(t, err) + + delete(testVolumes, "volume-3") + + volumesResult, err := testHostPathDriverState.ListVolumes() + assert.Nil(t, err) + assert.EqualValues(t, testVolumes, volumesResult) +} + +func TestLoadVolumesFromFile(t *testing.T) { + err := testHostPathDriverState.loadVolumesFromFile() + assert.Nil(t, err) + assert.EqualValues(t, testVolumes, testHostPathDriverState.HostPathVolumes) +} + +func TestGetSnapshotID(t *testing.T) { + testCases := []struct { + name string + inputPath string + expectedIsSnapshot bool + expectedSnapshotID string + }{ + { + name: "should recognize foo.snap as a valid snapshot with ID foo", + inputPath: "foo.snap", + expectedIsSnapshot: true, + expectedSnapshotID: "foo", + }, + { + name: "should recognize baz.tar.gz as an invalid snapshot", + inputPath: "baz.tar.gz", + expectedIsSnapshot: false, + expectedSnapshotID: "", + }, + { + name: "should recognize baz.tar.snap as a valid snapshot with ID baz.tar", + inputPath: "baz.tar.snap", + expectedIsSnapshot: true, + expectedSnapshotID: "baz.tar", + }, + { + name: "should recognize baz.tar.snap.snap as a valid snapshot with ID baz.tar.snap", + inputPath: "baz.tar.snap.snap", + expectedIsSnapshot: true, + expectedSnapshotID: "baz.tar.snap", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualIsSnapshot, actualSnapshotID := getSnapshotID(tc.inputPath) + if actualIsSnapshot != tc.expectedIsSnapshot { + t.Errorf("unexpected result for path %s, Want: %t, Got: %t", tc.inputPath, tc.expectedIsSnapshot, actualIsSnapshot) + } + if actualSnapshotID != tc.expectedSnapshotID { + t.Errorf("unexpected snapshotID for path %s, Want: %s; Got :%s", tc.inputPath, tc.expectedSnapshotID, actualSnapshotID) + } + }) + } +} + +func TestCreateSnapshot(t *testing.T) { + testSnapshot := HostPathSnapshot{ + Name: "snapshot-1", + Id: "snapshot-1", + VolID: "volume-1", + Path: getSnapshotPath("snapshot-1", testHostPathDriverState.DataRoot), + CreationTime: ptypes.TimestampNow(), + SizeBytes: 10 * 1024 * 1024, + ReadyToUse: true, + } + + snapshot, err := testHostPathDriverState.CreateSnapshot( + testSnapshot.Name, testSnapshot.Id, + testSnapshot.VolID, testSnapshot.Path, + testSnapshot.CreationTime, + testSnapshot.SizeBytes, testSnapshot.ReadyToUse) + + assert.Nil(t, err) + assert.EqualValues(t, testSnapshot, snapshot) + testSnapshots[testSnapshot.Id] = snapshot +} + +func TestListSnapshots(t *testing.T) { + testSnapshot := HostPathSnapshot{ + Name: "snapshot-2", + Id: "snapshot-2", + VolID: "volume-2", + Path: getSnapshotPath("snapshot-2", testHostPathDriverState.DataRoot), + CreationTime: ptypes.TimestampNow(), + SizeBytes: 10 * 1024 * 1024, + ReadyToUse: true, + } + + snapshot, err := testHostPathDriverState.CreateSnapshot( + testSnapshot.Name, testSnapshot.Id, + testSnapshot.VolID, testSnapshot.Path, + testSnapshot.CreationTime, + testSnapshot.SizeBytes, testSnapshot.ReadyToUse) + + assert.Nil(t, err) + assert.EqualValues(t, testSnapshot, snapshot) + testSnapshots[testSnapshot.Id] = snapshot + + snapshots, err := testHostPathDriverState.ListSnapshots() + assert.Nil(t, err) + assert.EqualValues(t, testSnapshots, snapshots) +} + +func TestGetSnapshotByID(t *testing.T) { + expectedSnapshot, ok := testSnapshots["snapshot-1"] + assert.True(t, ok) + + actualSnapshot, err := testHostPathDriverState.GetSnapshotByID("snapshot-1") + assert.Nil(t, err) + + assert.EqualValues(t, expectedSnapshot, actualSnapshot) +} + +func TestGetSnapshotByName(t *testing.T) { + expectedSnapshot, ok := testSnapshots["snapshot-2"] + assert.True(t, ok) + + actualSnapshot, err := testHostPathDriverState.GetSnapshotByName("snapshot-2") + assert.Nil(t, err) + + assert.EqualValues(t, expectedSnapshot, actualSnapshot) +} + +func TestDeleteSnapshot(t *testing.T) { + delete(testSnapshots, "snapshot-2") + + err := testHostPathDriverState.DeleteSnapshot("snapshot-2") + assert.Nil(t, err) + + snapshots, err := testHostPathDriverState.ListSnapshots() + assert.Nil(t, err) + assert.EqualValues(t, testSnapshots, snapshots) +} + +func TestLoadSnapshotsFromFile(t *testing.T) { + err := testHostPathDriverState.loadSnapshotsFromFile() + assert.Nil(t, err) + + snapshots, err := testHostPathDriverState.ListSnapshots() + assert.Nil(t, err) + assert.EqualValues(t, testSnapshots, snapshots) +} + +// func TestLoadStateFromFile(t *testing.T) { +// newdriverState, err := LoadStateFromFile(tmpDataRoot) +// assert.Nil(t, err) +// assert.EqualValues(t, *testHostPathDriverState, *newdriverState) +// } diff --git a/pkg/state/types.go b/pkg/state/types.go new file mode 100644 index 000000000..70404f9fa --- /dev/null +++ b/pkg/state/types.go @@ -0,0 +1,118 @@ +package state + +import ( + "path" + "sync" + + "github.com/golang/protobuf/ptypes/timestamp" +) + +type AccessType int + +const ( + MountAccess AccessType = iota + BlockAccess +) + +const ( + kib int64 = 1024 + mib int64 = kib * 1024 + gib int64 = mib * 1024 + gib100 int64 = gib * 100 + tib int64 = gib * 1024 + tib100 int64 = tib * 100 + + // StorageKind is the special parameter which requests + // storage of a certain kind (only affects capacity checks). + StorageKind = "kind" +) + +var ( + // Extension with which snapshot files will be saved. + SnapshotExt = ".snap" +) + +const ( + deviceID = "deviceID" + maxStorageCapacity = tib +) + +type HostPathVolume struct { + VolName string `json:"volName"` + VolID string `json:"volID"` + VolSize int64 `json:"volSize"` + VolPath string `json:"volPath"` + VolAccessType AccessType `json:"volAccessType"` + ParentVolID string `json:"parentVolID,omitempty"` + ParentSnapID string `json:"parentSnapID,omitempty"` + Ephemeral bool `json:"ephemeral"` + NodeID string `json:"nodeID"` + Kind string `json:"kind"` + ReadOnlyAttach bool `json:"readOnlyAttach"` + IsAttached bool `json:"isAttached"` + IsStaged bool `json:"isStaged"` + IsPublished bool `json:"isPublished"` +} + +type HostPathVolumes map[string]HostPathVolume + +type HostPathSnapshot struct { + Name string + Id string + VolID string + Path string + CreationTime *timestamp.Timestamp + SizeBytes int64 + ReadyToUse bool +} + +type HostPathSnapshots map[string]HostPathSnapshot + +type HostPathDriverState struct { + HostPathVolumes HostPathVolumes + HostPathSnapshots HostPathSnapshots + + VolumesFileRWLock *sync.RWMutex + SnapshotsFileRWLock *sync.RWMutex + + DataRoot string + VolumeDataFilePath string +} + +func NewHostPathDriverState(dataRoot string) *HostPathDriverState { + return &HostPathDriverState{ + HostPathVolumes: make(HostPathVolumes), + HostPathSnapshots: make(HostPathSnapshots), + + VolumesFileRWLock: &sync.RWMutex{}, + SnapshotsFileRWLock: &sync.RWMutex{}, + DataRoot: dataRoot, + VolumeDataFilePath: path.Join(dataRoot, "volumes.json"), + } +} + +type DriverState interface { + VolumeState + SnapshotState +} + +type VolumeState interface { + ListVolumes() (HostPathVolumes, error) + GetVolumeByID(volID string) (HostPathVolume, error) + UpdateVolume(volID string, volume HostPathVolume) error + GetVolumeByName(volName string) (HostPathVolume, error) + DeleteVolume(volID string, capacity Capacity) error + CreateVolume(volID, name string, cap int64, volAccessType AccessType, ephemeral bool, kind string, maxVolumeSize int64, capacity Capacity) (hpv *HostPathVolume, finalErr error) + GetVolumeLocker() *sync.RWMutex + GetSortedVolumeIDs() []string + SumVolumeSizes(kind string) (sum int64) +} + +type SnapshotState interface { + ListSnapshots() (HostPathSnapshots, error) + GetSnapshotByName(name string) (HostPathSnapshot, error) + GetSnapshotByID(id string) (HostPathSnapshot, error) + CreateSnapshot(name, snapshotId, volumeId, snapshotFilePath string, creationTime *timestamp.Timestamp, size int64, readyToUse bool) (HostPathSnapshot, error) + DeleteSnapshot(snapshotId string) error + GetSnapshotLocker() *sync.RWMutex +}