diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 6e4117250dc..0f4897f7ded 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -272,7 +272,7 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C } } c.logger.Debug( - "volume could not be claimed because it is in use, retrying in %v", backoff) + "volume could not be claimed because it is in use", "retry_in", backoff) t.Reset(backoff) } return &resp, err @@ -377,8 +377,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { backoff = c.maxBackoffInterval } } - c.logger.Debug( - "volume could not be unmounted, retrying in %v", backoff) + c.logger.Debug("volume could not be unmounted", "retry_in", backoff) t.Reset(backoff) } return nil diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 120c058d034..525a86b4e4f 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2384,21 +2384,17 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { c := core.(*CoreScheduler) require.NoError(c.csiVolumeClaimGC(gc)) - // TODO(tgross): the condition below means this test doesn't tell - // us much; ideally we should be intercepting the claim request - // and verifying that we send the expected claims but we don't - // have test infra in place to do that for server RPCs - // sending the GC claim will trigger the volumewatcher's normal - // code path. but the volumewatcher will hit an error here - // because there's no path to the node, so we shouldn't see - // the WriteClaims removed + // code path. the volumewatcher will hit an error here because + // there's no path to the node, but this is a node-only plugin so + // we accept that the node has been GC'd and there's no point + // holding onto the claim require.Eventually(func() bool { vol, _ := state.CSIVolumeByID(ws, ns, volID) - return len(vol.WriteClaims) == 1 && - len(vol.WriteAllocs) == 1 && - len(vol.PastClaims) == 1 - }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") + return len(vol.WriteClaims) == 0 && + len(vol.WriteAllocs) == 0 && + len(vol.PastClaims) == 0 + }, time.Second*2, 10*time.Millisecond, "claims were not released") } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 4043955cc2d..0cb45776104 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -472,15 +472,6 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS args.NodeID = alloc.NodeID } - if isNewClaim { - // if this is a new claim, add a Volume and PublishContext from the - // controller (if any) to the reply - err = v.controllerPublishVolume(args, reply) - if err != nil { - return fmt.Errorf("controller publish: %v", err) - } - } - resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args) if err != nil { v.logger.Error("csi raft apply failed", "error", err, "method", "claim") @@ -490,6 +481,15 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS return respErr } + if isNewClaim { + // if this is a new claim, add a Volume and PublishContext from the + // controller (if any) to the reply + err = v.controllerPublishVolume(args, reply) + if err != nil { + return fmt.Errorf("controller publish: %v", err) + } + } + reply.Index = index v.srv.setQueryMeta(&reply.QueryMeta) return nil @@ -570,7 +570,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, err = v.srv.RPC(method, cReq, cResp) if err != nil { - return fmt.Errorf("attach volume: %v", err) + if strings.Contains(err.Error(), "FailedPrecondition") { + return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err) + } + return err } resp.PublishContext = cResp.PublishContext return nil @@ -725,6 +728,11 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C } func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + if claim.AccessMode == structs.CSIVolumeAccessModeUnknown { + // claim has already been released client-side + return nil + } + req := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: vol.PluginID, VolumeID: vol.ID, @@ -820,17 +828,17 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str // and GC'd by this point, so looking there is the last resort. func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) { for _, rClaim := range vol.ReadClaims { - if rClaim.NodeID == claim.NodeID { + if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" { return rClaim.ExternalNodeID, nil } } for _, wClaim := range vol.WriteClaims { - if wClaim.NodeID == claim.NodeID { + if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" { return wClaim.ExternalNodeID, nil } } for _, pClaim := range vol.PastClaims { - if pClaim.NodeID == claim.NodeID { + if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" { return pClaim.ExternalNodeID, nil } } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 78e806a9926..254daea5016 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -256,7 +256,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { } claimResp := &structs.CSIVolumeClaimResponse{} err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0), + require.EqualError(t, err, fmt.Sprintf("volume not found: %s", id0), "expected 'volume not found' error because volume hasn't yet been created") // Create a plugin and volume @@ -449,14 +449,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { claimResp := &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) // Because the node is not registered - require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node") + require.EqualError(t, err, "controller publish: controller attach volume: No path to node") // The node SecretID is authorized for all policies claimReq.AuthToken = node.SecretID claimReq.Namespace = "" claimResp = &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node") + require.EqualError(t, err, "controller publish: controller attach volume: No path to node") } func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { @@ -515,7 +515,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { { name: "first unpublish", startingState: structs.CSIVolumeClaimStateTaken, - expectedErrMsg: "could not detach from node: No path to node", + expectedErrMsg: "could not detach from controller: controller detach volume: No path to node", }, }