Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: nomad volume detach command #8584

Merged
merged 7 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
return err
}

func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w)
return err
}

// CSIVolumeAttachmentMode duplicated in nomad/structs/csi.go
type CSIVolumeAttachmentMode string

Expand Down
62 changes: 52 additions & 10 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,35 @@ func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *htt
// Tokenize the suffix of the path to get the volume id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/volume/csi/")
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
if len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
id := tokens[0]

switch req.Method {
case "GET":
return s.csiVolumeGet(id, resp, req)
case "PUT":
return s.csiVolumePut(id, resp, req)
case "DELETE":
return s.csiVolumeDelete(id, resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
if len(tokens) == 1 {
switch req.Method {
case http.MethodGet:
return s.csiVolumeGet(id, resp, req)
case http.MethodPut:
return s.csiVolumePut(id, resp, req)
case http.MethodDelete:
return s.csiVolumeDelete(id, resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

if len(tokens) == 2 {
if tokens[1] != "detach" {
return nil, CodedError(404, resourceNotFoundErr)
}
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
return s.csiVolumeDetach(id, resp, req)
}

return nil, CodedError(404, resourceNotFoundErr)
}

func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -149,6 +163,34 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h
return nil, nil
}

func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}

nodeID := req.URL.Query().Get("node")
if nodeID == "" {
return nil, CodedError(400, "detach requires node ID")
}

args := structs.CSIVolumeUnpublishRequest{
VolumeID: id,
Claim: &structs.CSIVolumeClaim{
NodeID: nodeID,
Mode: structs.CSIVolumeClaimRelease,
},
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.CSIVolumeUnpublishResponse
if err := s.agent.RPC("CSIVolume.Unpublish", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return nil, nil
}

// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"volume detach": func() (cli.Command, error) {
return &VolumeDetachCommand{
Meta: meta,
}, nil
},
}

deprecated := map[string]cli.CommandFactory{
Expand Down
4 changes: 4 additions & 0 deletions command/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Usage: nomad volume <subcommand> [options]

$ nomad volume deregister <id>

Detach an unused volume:

$ nomad volume detach <vol id> <node id>

Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
Expand Down
97 changes: 97 additions & 0 deletions command/volume_detach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type VolumeDetachCommand struct {
Meta
}

func (c *VolumeDetachCommand) Help() string {
helpText := `
Usage: nomad volume detach [options] <vol id> <node id>

Detach a volume from a Nomad client.

General Options:

` + generalOptionsUsage() + `

`
return strings.TrimSpace(helpText)
}

func (c *VolumeDetachCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{})
}

func (c *VolumeDetachCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
matches := resp.Matches[contexts.Volumes]

resp, _, err = client.Search().PrefixSearch(a.Last, contexts.Nodes, nil)
if err != nil {
return []string{}
}
for _, match := range resp.Matches[contexts.Nodes] {
matches = append(matches, match)
}
return matches
})
}

func (c *VolumeDetachCommand) Synopsis() string {
return "Detach a volume"
}

func (c *VolumeDetachCommand) Name() string { return "volume detach" }

func (c *VolumeDetachCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}

// Check that we get exactly two arguments
args = flags.Args()
if l := len(args); l != 2 {
c.Ui.Error("This command takes two arguments: <vol id> <node id>")
c.Ui.Error(commandErrorText(c))
return 1
}
volID := args[0]
nodeID := args[1]

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

err = client.CSIVolumes().Detach(volID, nodeID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error detaching volume: %s", err))
return 1
}

return 0
}
63 changes: 61 additions & 2 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,66 @@ RELEASE_CLAIM:
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
return err
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
}
err = merr.ErrorOrNil()
if err != nil {
return err
}

claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand All @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return fmt.Errorf("could not detach from node: %w", err)
}
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
return nil
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
Expand Down
Loading