Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of CSI: retry claims from client into release/1.1.x #12660

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/12113.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail their first placement after a reschedule
```

```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail to restore after a client restart
```
80 changes: 76 additions & 4 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package allocrunner
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ type csiHook struct {
nodeSecret string

volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}
Expand All @@ -47,6 +49,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
Expand Down Expand Up @@ -213,11 +216,10 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
},
}

var resp structs.CSIVolumeClaimResponse
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
resp, err := c.claimWithRetry(req)
if err != nil {
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}

if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
Expand All @@ -230,6 +232,74 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil
}

// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()

var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}

err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}

if !isRetryableClaimRPCError(err) {
break
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
t.Reset(backoff)
}
return &resp, err
}

// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {

// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}

func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
Expand Down Expand Up @@ -286,7 +356,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
Expand All @@ -307,6 +377,8 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
t.Reset(backoff)
}
return nil
Expand Down
147 changes: 120 additions & 27 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -34,6 +36,9 @@ func TestCSIHook(t *testing.T) {
testcases := []struct {
name string
volumeRequests map[string]*structs.VolumeRequest
startsUnschedulable bool
startsWithClaims bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo
expectedMountCalls int
expectedUnmountCalls int
Expand Down Expand Up @@ -89,6 +94,58 @@ func TestCSIHook(t *testing.T) {
expectedUnpublishCalls: 1,
},

{
name: "fatal error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsUnschedulable: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 0,
expectedUnmountCalls: 0,
expectedClaimCalls: 1,
expectedUnpublishCalls: 0,
expectedClaimErr: errors.New(
"claim volumes: could not claim volume testvolume0: volume is currently unschedulable"),
},

{
name: "retryable error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithClaims: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 1,
expectedClaimCalls: 2,
expectedUnpublishCalls: 1,
},

// TODO: this won't actually work on the client.
// https://github.com/hashicorp/nomad/issues/11798
//
Expand Down Expand Up @@ -136,7 +193,12 @@ func TestCSIHook(t *testing.T) {

callCounts := map[string]int{}
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts}
rpcer := mockRPCer{
alloc: alloc,
callCounts: callCounts,
hasExistingClaim: helper.BoolToPtr(tc.startsWithClaims),
schedulable: helper.BoolToPtr(!tc.startsUnschedulable),
}
ar := mockAllocRunner{
res: &cstructs.AllocHookResources{},
caps: &drivers.Capabilities{
Expand All @@ -145,17 +207,24 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond

require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun())
}

require.NoError(t, hook.Postrun())
require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
Expand All @@ -168,25 +237,11 @@ func TestCSIHook(t *testing.T) {

// HELPERS AND MOCKS

func testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = true
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}
return vol
}

type mockRPCer struct {
alloc *structs.Allocation
callCounts map[string]int
alloc *structs.Allocation
callCounts map[string]int
hasExistingClaim *bool
schedulable *bool
}

// RPC mocks the server RPCs, acting as though any request succeeds
Expand All @@ -195,7 +250,7 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
case "CSIVolume.Claim":
r.callCounts["claim"]++
req := args.(*structs.CSIVolumeClaimRequest)
vol := testVolume(req.VolumeID)
vol := r.testVolume(req.VolumeID)
err := vol.Claim(req.ToClaim(), r.alloc)
if err != nil {
return err
Expand All @@ -215,6 +270,44 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
return nil
}

// testVolume is a helper that optionally starts as unschedulable /
// claimed until after the first claim RPC is made, so that we can
// test retryable vs non-retryable failures
func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = *r.schedulable
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}

if *r.hasExistingClaim {
vol.AccessMode = structs.CSIVolumeAccessModeSingleNodeReader
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol.ReadClaims["another-alloc-id"] = &structs.CSIVolumeClaim{
AllocationID: "another-alloc-id",
NodeID: "another-node-id",
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
}
}

if r.callCounts["claim"] >= 0 {
*r.hasExistingClaim = false
*r.schedulable = true
}

return vol
}

type mockVolumeMounter struct {
callCounts map[string]int
}
Expand Down