Skip to content

Commit

Permalink
dump and restore internal state
Browse files Browse the repository at this point in the history
This replaces the previous approach, trying to reconstruct state from
observations, with a simpler dump/restore of the internal state as a
JSON file in the driver's data directory.

The advantage is that *all* volume and snapshot attributes get
restored, not just those that can be deducted from mount points.

No attempts are made to restore state properly after a node reboot.
  • Loading branch information
pohly committed Apr 20, 2021
1 parent e4d72e3 commit eefb2c5
Show file tree
Hide file tree
Showing 24 changed files with 4,563 additions and 444 deletions.
1 change: 1 addition & 0 deletions cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func main() {

flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver")
flag.StringVar(&cfg.StateDir, "statedir", "/csi-data-dir", "directory for storing state information across driver restarts, volumes and snapshots")
flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
flag.BoolVar(&cfg.Ephemeral, "ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
flag.Int64Var(&cfg.MaxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/kubernetes-csi/csi-lib-utils v0.9.0
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/thecodeteam/goscaleio v0.1.0/go.mod h1:68sdkZAsK8bvEwBlbQnlLS+xU+hvLYM/iQ8KXej1AwM=
Expand Down
109 changes: 54 additions & 55 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"

Expand All @@ -35,17 +34,12 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
utilexec "k8s.io/utils/exec"
)

const (
deviceID = "deviceID"
"github.com/kubernetes-csi/csi-driver-host-path/pkg/state"
)

type accessType int

const (
mountAccess accessType = iota
blockAccess
deviceID = "deviceID"
)

func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) {
Expand Down Expand Up @@ -84,13 +78,13 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type")
}

var requestedAccessType accessType
var requestedAccessType state.AccessType

if accessTypeBlock {
requestedAccessType = blockAccess
requestedAccessType = state.BlockAccess
} else {
// Default to mount.
requestedAccessType = mountAccess
requestedAccessType = state.MountAccess
}

// Lock before acting on global state. A production-quality
Expand All @@ -106,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque

// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := hp.getVolumeByName(req.GetName()); err == nil {
if exVol, err := hp.state.GetVolumeByName(req.GetName()); err == nil {
// Since err is nil, it means the volume with the same name already exists
// need to check if the size of existing volume is the same as in new
// request
Expand Down Expand Up @@ -149,7 +143,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath)

if req.GetVolumeContentSource() != nil {
path := getVolumePath(volumeID)
path := hp.getVolumePath(volumeID)
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
Expand Down Expand Up @@ -203,7 +197,7 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque
defer hp.mutex.Unlock()

volId := req.GetVolumeId()
vol, err := hp.getVolumeByID(volId)
vol, err := hp.state.GetVolumeByID(volId)
if err != nil {
// Volume not found: might have already deleted
return &csi.DeleteVolumeResponse{}, nil
Expand Down Expand Up @@ -243,7 +237,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
hp.mutex.Lock()
defer hp.mutex.Unlock()

if _, err := hp.getVolumeByID(req.GetVolumeId()); err != nil {
if _, err := hp.state.GetVolumeByID(req.GetVolumeId()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -287,7 +281,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
hp.mutex.Lock()
defer hp.mutex.Unlock()

vol, err := hp.getVolumeByID(req.VolumeId)
vol, err := hp.state.GetVolumeByID(req.VolumeId)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
Expand All @@ -311,8 +305,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro

vol.IsAttached = true
vol.ReadOnlyAttach = req.GetReadonly()
if err := hp.updateVolume(vol.VolID, vol); err != nil {
return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err)
if err := hp.state.UpdateVolume(vol); err != nil {
return nil, err
}

return &csi.ControllerPublishVolumeResponse{
Expand All @@ -337,7 +331,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
hp.mutex.Lock()
defer hp.mutex.Unlock()

vol, err := hp.getVolumeByID(req.VolumeId)
vol, err := hp.state.GetVolumeByID(req.VolumeId)
if err != nil {
// Not an error: a non-existent volume is not published.
// See also https://github.com/kubernetes-csi/external-attacher/pull/165
Expand All @@ -351,7 +345,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
}

vol.IsAttached = false
if err := hp.updateVolume(vol.VolID, vol); err != nil {
if err := hp.state.UpdateVolume(vol); err != nil {
return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err)
}

Expand Down Expand Up @@ -399,15 +393,20 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest

var (
startIdx, volumesLength, maxLength int64
hpVolume hostPathVolume
hpVolume state.Volume
)

// Lock before acting on global state. A production-quality
// driver might use more fine-grained locking.
hp.mutex.Lock()
defer hp.mutex.Unlock()

volumeIds := hp.getSortedVolumeIDs()
// Sort by volume ID.
volumes := hp.state.GetVolumes()
sort.Slice(volumes, func(i, j int) bool {
return volumes[i].VolID < volumes[j].VolID
})

if req.StartingToken == "" {
req.StartingToken = "1"
}
Expand All @@ -417,16 +416,16 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer")
}

volumesLength = int64(len(volumeIds))
volumesLength = int64(len(volumes))
maxLength = int64(req.MaxEntries)

if maxLength > volumesLength || maxLength <= 0 {
maxLength = volumesLength
}

for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
hpVolume = hp.volumes[volumeIds[index]]
healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index])
hpVolume = volumes[index]
healthy, msg := hp.doHealthCheckInControllerSide(hpVolume.VolID)
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
Expand All @@ -453,7 +452,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
hp.mutex.Lock()
defer hp.mutex.Unlock()

volume, err := hp.getVolumeByID(req.GetVolumeId())
volume, err := hp.state.GetVolumeByID(req.GetVolumeId())
if err != nil {
return nil, err
}
Expand All @@ -475,11 +474,6 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
}, nil
}

// getSnapshotPath returns the full path to where the snapshot is stored
func getSnapshotPath(snapshotID string) string {
return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, snapshotExt))
}

// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin.
func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
Expand All @@ -503,7 +497,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR

// Need to check for already existing snapshot name, and if found check for the
// requested sourceVolumeId and sourceVolumeId of snapshot that has been created.
if exSnap, err := hp.getSnapshotByName(req.GetName()); err == nil {
if exSnap, err := hp.state.GetSnapshotByName(req.GetName()); err == nil {
// Since err is nil, it means the snapshot with the same name already exists need
// to check if the sourceVolumeId of existing snapshot is the same as in new request.
if exSnap.VolID == req.GetSourceVolumeId() {
Expand All @@ -522,18 +516,18 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
}

volumeID := req.GetSourceVolumeId()
hostPathVolume, err := hp.getVolumeByID(volumeID)
hostPathVolume, err := hp.state.GetVolumeByID(volumeID)
if err != nil {
return nil, err
}

snapshotID := uuid.NewUUID().String()
creationTime := ptypes.TimestampNow()
volPath := hostPathVolume.VolPath
file := getSnapshotPath(snapshotID)
file := hp.getSnapshotPath(snapshotID)

var cmd []string
if hostPathVolume.VolAccessType == blockAccess {
if hostPathVolume.VolAccessType == state.BlockAccess {
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume")
cmd = []string{"cp", volPath, file}
} else {
Expand All @@ -547,7 +541,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
}

glog.V(4).Infof("create volume snapshot %s", file)
snapshot := hostPathSnapshot{}
snapshot := state.Snapshot{}
snapshot.Name = req.GetName()
snapshot.Id = snapshotID
snapshot.VolID = volumeID
Expand All @@ -556,8 +550,9 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
snapshot.SizeBytes = hostPathVolume.VolSize
snapshot.ReadyToUse = true

hp.snapshots[snapshotID] = snapshot

if err := hp.state.UpdateSnapshot(snapshot); err != nil {
return nil, err
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: snapshot.Id,
Expand Down Expand Up @@ -587,9 +582,11 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR
defer hp.mutex.Unlock()

glog.V(4).Infof("deleting snapshot %s", snapshotID)
path := getSnapshotPath(snapshotID)
path := hp.getSnapshotPath(snapshotID)
os.RemoveAll(path)
delete(hp.snapshots, snapshotID)
if err := hp.state.DeleteSnapshot(snapshotID); err != nil {
return nil, err
}
return &csi.DeleteSnapshotResponse{}, nil
}

Expand All @@ -604,33 +601,35 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq
hp.mutex.Lock()
defer hp.mutex.Unlock()

// case 1: SnapshotId is not empty, return snapshots that match the snapshot id.
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id,
// none if not found.
if len(req.GetSnapshotId()) != 0 {
snapshotID := req.SnapshotId
if snapshot, ok := hp.snapshots[snapshotID]; ok {
if snapshot, err := hp.state.GetSnapshotByID(snapshotID); err == nil {
return convertSnapshot(snapshot), nil
}
return &csi.ListSnapshotsResponse{}, nil
}

// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id.
// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id,
// none if not found.
if len(req.GetSourceVolumeId()) != 0 {
for _, snapshot := range hp.snapshots {
for _, snapshot := range hp.state.GetSnapshots() {
if snapshot.VolID == req.SourceVolumeId {
return convertSnapshot(snapshot), nil
}
}
return &csi.ListSnapshotsResponse{}, nil
}

var snapshots []csi.Snapshot
// case 3: no parameter is set, so we return all the snapshots.
sortedKeys := make([]string, 0)
for k := range hp.snapshots {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)
hpSnapshots := hp.state.GetSnapshots()
sort.Slice(hpSnapshots, func(i, j int) bool {
return hpSnapshots[i].Id < hpSnapshots[j].Id
})

for _, key := range sortedKeys {
snap := hp.snapshots[key]
for _, snap := range hpSnapshots {
snapshot := csi.Snapshot{
SnapshotId: snap.Id,
SourceVolumeId: snap.VolID,
Expand Down Expand Up @@ -725,15 +724,15 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
hp.mutex.Lock()
defer hp.mutex.Unlock()

exVol, err := hp.getVolumeByID(volID)
exVol, err := hp.state.GetVolumeByID(volID)
if err != nil {
return nil, err
}

if exVol.VolSize < capacity {
exVol.VolSize = capacity
if err := hp.updateVolume(volID, exVol); err != nil {
return nil, fmt.Errorf("could not update volume %s: %w", volID, err)
if err := hp.state.UpdateVolume(exVol); err != nil {
return nil, err
}
}

Expand All @@ -743,7 +742,7 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
}, nil
}

func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
func convertSnapshot(snap state.Snapshot) *csi.ListSnapshotsResponse {
entries := []*csi.ListSnapshotsResponse_Entry{
{
Snapshot: &csi.Snapshot{
Expand Down
Loading

0 comments on commit eefb2c5

Please sign in to comment.