Skip to content

Commit

Permalink
Add state to snapshot create and configurable retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Grant Griffiths <[email protected]>
  • Loading branch information
ggriffiths committed Aug 9, 2019
1 parent 850184d commit c86ff40
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 32 deletions.
Binary file added cmd/csi-snapshotter/csi-snapshotter
Binary file not shown.
6 changes: 6 additions & 0 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var (
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed snapshot creation. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed snapshot creation.")
failedSnapshotThreshold = flag.Int("failed-snapshot-threshold", 15, "The maximum number of retries on snapshot failures. Set 0 to retry indefinitely. Default is 15.")
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
Expand Down Expand Up @@ -191,6 +194,9 @@ func main() {
coreFactory.Core().V1().PersistentVolumeClaims(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
*retryIntervalStart,
*retryIntervalMax,
*failedSnapshotThreshold,
snapShotter,
*csiTimeout,
*resyncPeriod,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Handler is responsible for handling VolumeSnapshot events from informer.
type Handler interface {
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, time.Time, int64, error)
}
Expand Down Expand Up @@ -58,19 +58,20 @@ func NewCSIHandler(
}
}

func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {

ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()

snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
if err != nil {
return "", "", time.Time{}, 0, false, err
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, err
}
newParameters, err := removePrefixedParameters(parameters)
if err != nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}

return handler.snapshotter.CreateSnapshot(ctx, snapshotName, volume, newParameters, snapshotterCredentials)
}

Expand Down
45 changes: 29 additions & 16 deletions pkg/controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storage "k8s.io/api/storage/v1beta1"
Expand Down Expand Up @@ -364,8 +365,8 @@ func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot
klog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
if err != nil {
snapshotObj, state, err := ctrl.createSnapshotOperation(snapshot)
if err != nil && state != snapshotter.SnapshottingFinished {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
return err
Expand All @@ -375,7 +376,19 @@ func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot
// We will get an "snapshot update" event soon, this is not a big error
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
}
return nil
if state == snapshotter.SnapshottingFinished {
// Snapshotting finished, remove obj from the queue.
ctrl.snapshotQueue.Done(snapshotObj)
return nil

} else if state == snapshotter.SnapshottingInBackground {
klog.V(4).Infof("createSnapshot [%s]: Temporary error received, adding Snapshot %s back in queue: %v", snapshotKey(snapshotObj), updateErr)
// Snapshotting still in progress.
return nil
} else {
// State is SnapshottingNoChange. Don't change the snapshot queue.
return nil
}
})
return nil
}
Expand Down Expand Up @@ -584,7 +597,7 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
}
driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, _, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
klog.Errorf("checkandUpdateBoundSnapshotStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
return nil, err
Expand All @@ -611,30 +624,30 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, snapshotter.SnapshottingState, error) {
klog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))

if snapshot.Status.Error != nil && !isControllerUpdateFailError(snapshot.Status.Error) {
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", snapshot.Status.Error.Message)
return snapshot, nil
return snapshot, snapshotter.SnapshottingFinished, nil
}

// If PVC is not being deleted and finalizer is not added yet, a finalizer should be added.
klog.V(5).Infof("createSnapshotOperation: Check if PVC is not being deleted and add Finalizer for source of snapshot [%s] if needed", snapshot.Name)
err := ctrl.ensureSnapshotSourceFinalizer(snapshot)
if err != nil {
klog.Errorf("createSnapshotOperation failed to add finalizer for source of snapshot %s", err)
return nil, err
return nil, snapshotter.SnapshottingFinished, err
}

class, volume, contentName, snapshotterCredentials, err := ctrl.getCreateSnapshotInput(snapshot)
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
return nil, snapshotter.SnapshottingFinished, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
}

driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, state, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
return nil, state, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
}

klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
Expand All @@ -651,16 +664,16 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
}

if err != nil {
return nil, err
return nil, snapshotter.SnapshottingFinished, err
}
// Create VolumeSnapshotContent in the database
volumeRef, err := ref.GetReference(scheme.Scheme, volume)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingFinished, err
}
snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingFinished, err
}

if class.DeletionPolicy == nil {
Expand Down Expand Up @@ -713,15 +726,15 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err)
klog.Error(strerr)
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
return nil, snapshotter.SnapshottingInBackground, newControllerUpdateError(snapshotKey(snapshot), err.Error())
}

// save succeeded, bind and update status for snapshot.
result, err := ctrl.bindandUpdateVolumeSnapshot(snapshotContent, newSnapshot)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingFinished, err
}
return result, nil
return result, snapshotter.SnapshottingFinished, nil
}

// Delete a snapshot
Expand Down
22 changes: 18 additions & 4 deletions pkg/controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type csiSnapshotController struct {

createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
retryIntervalStart time.Duration
retryIntervalMax time.Duration
failedSnapshotThreshold int
resyncPeriod time.Duration
}

Expand All @@ -82,6 +85,9 @@ func NewCSISnapshotController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
retryIntervalStart time.Duration,
retryIntervalMax time.Duration,
failedSnapshotThreshold int,
snapshotter snapshotter.Snapshotter,
timeout time.Duration,
resyncPeriod time.Duration,
Expand All @@ -103,10 +109,13 @@ func NewCSISnapshotController(
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
retryIntervalStart: retryIntervalStart,
retryIntervalMax: retryIntervalMax,
failedSnapshotThreshold: failedSnapshotThreshold,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(retryIntervalStart, retryIntervalMax), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}

Expand Down Expand Up @@ -205,7 +214,6 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
if quit {
return true
}
defer ctrl.snapshotQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("snapshotWorker[%s]", key)

Expand Down Expand Up @@ -393,12 +401,18 @@ func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot
}
err = ctrl.syncSnapshot(snapshot)
if err != nil {
sKey := snapshotKey(snapshot)
if ctrl.failedSnapshotThreshold == 0 {
ctrl.snapshotQueue.AddRateLimited(sKey)
} else if ctrl.snapshotQueue.NumRequeues(sKey) < ctrl.failedSnapshotThreshold {
ctrl.snapshotQueue.AddRateLimited(sKey)
}
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
klog.V(3).Infof("could not sync claim %q: %+v", sKey, err)
} else {
klog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
klog.Errorf("could not sync volume %q: %+v", sKey, err)
}
}
}
Expand Down
62 changes: 55 additions & 7 deletions pkg/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ import (
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"k8s.io/api/core/v1"

"k8s.io/klog"
)

// Snapshotter implements CreateSnapshot/DeleteSnapshot operations against a remote CSI driver.
type Snapshotter interface {
// CreateSnapshot creates a snapshot for a volume
CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp time.Time, size int64, readyToUse bool, err error)
CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp time.Time, size int64, readyToUse bool, snapshotterState SnapshottingState, err error)

// DeleteSnapshot deletes a snapshot from a volume
DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error)
Expand All @@ -47,23 +50,40 @@ type snapshot struct {
conn *grpc.ClientConn
}

// SnapshottingState is the state of volume snapshotting. It tells the controller if snapshotting could
// be in progress in the background after a CreateSnapshot() call returns or the snapshotting is
// 100% finished (with or without success)
type SnapshottingState string

const (
// SnapshottingInBackground tells the controller that snapshotting may be in progress
// after CreateSnapshot exits.
SnapshottingInBackground SnapshottingState = "Background"
// SnapshottingFinished tells the controller that snapshotting is not running in the background
// and has exited successfully or with errors.
SnapshottingFinished SnapshottingState = "Finished"
// SnapshottingNoChange tells the controller that snapshotting status has not changed since
// CreateSnapshot was called.
SnapshottingNoChange SnapshottingState = "NoChange"
)

func NewSnapshotter(conn *grpc.ClientConn) Snapshotter {
return &snapshot{
conn: conn,
}
}

func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, SnapshottingState, error) {
klog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName)
if volume.Spec.CSI == nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
return "", "", time.Time{}, 0, false, SnapshottingFinished, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
}

client := csi.NewControllerClient(s.conn)

driverName, err := csirpc.GetDriverName(ctx, s.conn)
if err != nil {
return "", "", time.Time{}, 0, false, err
return "", "", time.Time{}, 0, false, SnapshottingFinished, err
}

req := csi.CreateSnapshotRequest{
Expand All @@ -75,15 +95,43 @@ func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volu

rsp, err := client.CreateSnapshot(ctx, &req)
if err != nil {
return "", "", time.Time{}, 0, false, err
if isFinalError(err) {
return "", "", time.Time{}, 0, false, SnapshottingFinished, err
}
return "", "", time.Time{}, 0, false, SnapshottingFinished, err
}

klog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse)
creationTime, err := ptypes.Timestamp(rsp.Snapshot.CreationTime)
if err != nil {
return "", "", time.Time{}, 0, false, err
return "", "", time.Time{}, 0, false, SnapshottingFinished, err
}
return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, nil
return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, SnapshottingFinished, nil
}

func isFinalError(err error) bool {
// Sources:
// https://github.com/kubernetes-csi/external-provisioner/commit/8203a03c47ce2b86a2a2c2421d74345b76183b14
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
// https://github.com/container-storage-interface/spec/blob/master/spec.md
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
// We don't know if any previous CreateVolume is in progress, be on the safe side.
return false
}
switch st.Code() {
case codes.Canceled, // gRPC: Client Application cancelled the request
codes.DeadlineExceeded, // gRPC: Timeout
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress.
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateVolume() may be still in progress.
codes.Aborted: // CSI: Operation pending for volume
return false
}
// All other errors mean that provisioning either did not
// even start or failed. It is for sure not in progress.
return true
}

func (s *snapshot) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ func TestCreateSnapshot(t *testing.T) {
}

s := NewSnapshotter(csiConn)
driverName, snapshotId, timestamp, size, readyToUse, err := s.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets)
driverName, snapshotId, timestamp, size, readyToUse, snapState, err := s.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets)
_ = snapState
if test.expectError && err == nil {
t.Errorf("test %q: Expected error, got none", test.name)
}
Expand Down

0 comments on commit c86ff40

Please sign in to comment.