diff --git a/.changelog/12113.txt b/.changelog/12113.txt new file mode 100644 index 00000000000..17f55ee6afe --- /dev/null +++ b/.changelog/12113.txt @@ -0,0 +1,7 @@ +```release-note:bug +csi: Fixed a bug where allocations with volume claims would fail their first placement after a reschedule +``` + +```release-note:bug +csi: Fixed a bug where allocations with volume claims would fail to restore after a client restart +``` diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 0594002ec00..6e4117250dc 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,7 @@ package allocrunner import ( "context" "fmt" + "strings" "sync" "time" @@ -28,6 +29,7 @@ type csiHook struct { nodeSecret string volumeRequests map[string]*volumeAndRequest + minBackoffInterval time.Duration maxBackoffInterval time.Duration maxBackoffDuration time.Duration } @@ -47,6 +49,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + minBackoffInterval: time.Second, maxBackoffInterval: time.Minute, maxBackoffDuration: time.Hour * 24, } @@ -213,11 +216,10 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) }, } - var resp structs.CSIVolumeClaimResponse - if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil { + resp, err := c.claimWithRetry(req) + if err != nil { return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err) } - if resp.Volume == nil { return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source) } @@ -230,6 +232,74 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) return result, nil } +// claimWithRetry tries to claim the volume on the server, retrying +// with exponential backoff capped to a maximum interval +func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + + var resp structs.CSIVolumeClaimResponse + var err error + backoff := c.minBackoffInterval + t, stop := helper.NewSafeTimer(0) + defer stop() + for { + select { + case <-ctx.Done(): + return nil, err + case <-t.C: + } + + err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp) + if err == nil { + break + } + + if !isRetryableClaimRPCError(err) { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + c.logger.Debug( + "volume could not be claimed because it is in use, retrying in %v", backoff) + t.Reset(backoff) + } + return &resp, err +} + +// isRetryableClaimRPCError looks for errors where we need to retry +// with backoff because we expect them to be eventually resolved. +func isRetryableClaimRPCError(err error) bool { + + // note: because these errors are returned via RPC which breaks error + // wrapping, we can't check with errors.Is and need to read the string + errMsg := err.Error() + if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) { + return true + } + if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) { + return true + } + if strings.Contains(errMsg, "no servers") { + return true + } + if strings.Contains(errMsg, structs.ErrNoLeader.Error()) { + return true + } + return false +} + func (c *csiHook) shouldRun() bool { tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) for _, vol := range tg.Volumes { @@ -286,7 +356,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) defer cancel() var err error - backoff := time.Second + backoff := c.minBackoffInterval t, stop := helper.NewSafeTimer(0) defer stop() for { @@ -307,6 +377,8 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { backoff = c.maxBackoffInterval } } + c.logger.Debug( + "volume could not be unmounted, retrying in %v", backoff) t.Reset(backoff) } return nil diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index d05d07385c3..6ed9270d5df 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -7,12 +7,14 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/pluginmanager" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -34,6 +36,9 @@ func TestCSIHook(t *testing.T) { testcases := []struct { name string volumeRequests map[string]*structs.VolumeRequest + startsUnschedulable bool + startsWithClaims bool + expectedClaimErr error expectedMounts map[string]*csimanager.MountInfo expectedMountCalls int expectedUnmountCalls int @@ -89,6 +94,58 @@ func TestCSIHook(t *testing.T) { expectedUnpublishCalls: 1, }, + { + name: "fatal error on claim", + volumeRequests: map[string]*structs.VolumeRequest{ + "vol0": { + Name: "vol0", + Type: structs.VolumeTypeCSI, + Source: "testvolume0", + ReadOnly: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{}, + PerAlloc: false, + }, + }, + startsUnschedulable: true, + expectedMounts: map[string]*csimanager.MountInfo{ + "vol0": &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedMountCalls: 0, + expectedUnmountCalls: 0, + expectedClaimCalls: 1, + expectedUnpublishCalls: 0, + expectedClaimErr: errors.New( + "claim volumes: could not claim volume testvolume0: volume is currently unschedulable"), + }, + + { + name: "retryable error on claim", + volumeRequests: map[string]*structs.VolumeRequest{ + "vol0": { + Name: "vol0", + Type: structs.VolumeTypeCSI, + Source: "testvolume0", + ReadOnly: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{}, + PerAlloc: false, + }, + }, + startsWithClaims: true, + expectedMounts: map[string]*csimanager.MountInfo{ + "vol0": &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedMountCalls: 1, + expectedUnmountCalls: 1, + expectedClaimCalls: 2, + expectedUnpublishCalls: 1, + }, + // TODO: this won't actually work on the client. // https://github.com/hashicorp/nomad/issues/11798 // @@ -136,7 +193,12 @@ func TestCSIHook(t *testing.T) { callCounts := map[string]int{} mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}} - rpcer := mockRPCer{alloc: alloc, callCounts: callCounts} + rpcer := mockRPCer{ + alloc: alloc, + callCounts: callCounts, + hasExistingClaim: helper.BoolToPtr(tc.startsWithClaims), + schedulable: helper.BoolToPtr(!tc.startsUnschedulable), + } ar := mockAllocRunner{ res: &cstructs.AllocHookResources{}, caps: &drivers.Capabilities{ @@ -145,17 +207,24 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") - hook.maxBackoffInterval = 100 * time.Millisecond - hook.maxBackoffDuration = 2 * time.Second + hook.minBackoffInterval = 1 * time.Millisecond + hook.maxBackoffInterval = 10 * time.Millisecond + hook.maxBackoffDuration = 500 * time.Millisecond require.NotNil(t, hook) - require.NoError(t, hook.Prerun()) - mounts := ar.GetAllocHookResources().GetCSIMounts() - require.NotNil(t, mounts) - require.Equal(t, tc.expectedMounts, mounts) + if tc.expectedClaimErr != nil { + require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error()) + mounts := ar.GetAllocHookResources().GetCSIMounts() + require.Nil(t, mounts) + } else { + require.NoError(t, hook.Prerun()) + mounts := ar.GetAllocHookResources().GetCSIMounts() + require.NotNil(t, mounts) + require.Equal(t, tc.expectedMounts, mounts) + require.NoError(t, hook.Postrun()) + } - require.NoError(t, hook.Postrun()) require.Equal(t, tc.expectedMountCalls, callCounts["mount"]) require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"]) require.Equal(t, tc.expectedClaimCalls, callCounts["claim"]) @@ -168,25 +237,11 @@ func TestCSIHook(t *testing.T) { // HELPERS AND MOCKS -func testVolume(id string) *structs.CSIVolume { - vol := structs.NewCSIVolume(id, 0) - vol.Schedulable = true - vol.RequestedCapabilities = []*structs.CSIVolumeCapability{ - { - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, - }, - { - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, - }, - } - return vol -} - type mockRPCer struct { - alloc *structs.Allocation - callCounts map[string]int + alloc *structs.Allocation + callCounts map[string]int + hasExistingClaim *bool + schedulable *bool } // RPC mocks the server RPCs, acting as though any request succeeds @@ -195,7 +250,7 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error case "CSIVolume.Claim": r.callCounts["claim"]++ req := args.(*structs.CSIVolumeClaimRequest) - vol := testVolume(req.VolumeID) + vol := r.testVolume(req.VolumeID) err := vol.Claim(req.ToClaim(), r.alloc) if err != nil { return err @@ -215,6 +270,44 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error return nil } +// testVolume is a helper that optionally starts as unschedulable / +// claimed until after the first claim RPC is made, so that we can +// test retryable vs non-retryable failures +func (r mockRPCer) testVolume(id string) *structs.CSIVolume { + vol := structs.NewCSIVolume(id, 0) + vol.Schedulable = *r.schedulable + vol.RequestedCapabilities = []*structs.CSIVolumeCapability{ + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + }, + } + + if *r.hasExistingClaim { + vol.AccessMode = structs.CSIVolumeAccessModeSingleNodeReader + vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + vol.ReadClaims["another-alloc-id"] = &structs.CSIVolumeClaim{ + AllocationID: "another-alloc-id", + NodeID: "another-node-id", + Mode: structs.CSIVolumeClaimRead, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + } + } + + if r.callCounts["claim"] >= 0 { + *r.hasExistingClaim = false + *r.schedulable = true + } + + return vol +} + type mockVolumeMounter struct { callCounts map[string]int }