Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add attach/detach operations to CSIConnection #3

Merged
merged 3 commits into from
Nov 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ package connection
import (
"context"
"fmt"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/api/core/v1"
)

const (
nodeIDAnnotation = "nodeid.csi.volume.kubernetes.io/"

// Key for node name in NodeID
nodeNameKey = "Name"
)

// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
Expand All @@ -38,6 +47,12 @@ type CSIConnection interface {
// PUBLISH_UNPUBLISH_VOLUME in ControllerGetCapabilities() gRPC call.
SupportsControllerPublish(ctx context.Context) (bool, error)

// Attach given volume to given node. Returns PublishVolumeInfo
Attach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) (map[string]string, error)

// Detach given volume from given node.
Detach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) error

// Close the connection
Close() error
}
Expand Down Expand Up @@ -153,6 +168,152 @@ func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (bool, er
return false, nil
}

func (c *csiConnection) Attach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) (map[string]string, error) {
client := csi.NewControllerClient(c.conn)

if pv.Spec.CSI == nil {
return nil, fmt.Errorf("only CSI volumes are supported")
}

nodeID, err := getNodeID(pv.Spec.CSI.Driver, node)
if err != nil {
return nil, err
}

caps, err := getVolumeCapabilities(pv)
if err != nil {
return nil, err
}

req := csi.ControllerPublishVolumeRequest{
Version: &csiVersion,
VolumeHandle: &csi.VolumeHandle{
Id: pv.Spec.CSI.VolumeHandle,
// TODO: add metadata???
},
NodeId: nodeID,
VolumeCapability: caps,
Readonly: pv.Spec.CSI.ReadOnly,
UserCredentials: nil,
}

rsp, err := client.ControllerPublishVolume(ctx, &req)
if err != nil {
return nil, err
}
e := rsp.GetError()
if e != nil {
// TODO: report the right error
return nil, fmt.Errorf("error calling ControllerPublishVolume: %+v", e)
}

result := rsp.GetResult()
if result == nil {
return nil, fmt.Errorf("result is empty")
}

return result.PublishVolumeInfo, nil
}

func (c *csiConnection) Detach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) error {
client := csi.NewControllerClient(c.conn)

if pv.Spec.CSI == nil {
return fmt.Errorf("only CSI volumes are supported")
}

nodeID, err := getNodeID(pv.Spec.CSI.Driver, node)
if err != nil {
return err
}

req := csi.ControllerUnpublishVolumeRequest{
Version: &csiVersion,
VolumeHandle: &csi.VolumeHandle{
Id: pv.Spec.CSI.VolumeHandle,
// TODO: add metadata???
},
NodeId: nodeID,
UserCredentials: nil,
}

rsp, err := client.ControllerUnpublishVolume(ctx, &req)
if err != nil {
return err
}
e := rsp.GetError()
if e != nil {
// TODO: report the right error
return fmt.Errorf("error calling ControllerUnpublishVolume: %+v", e)
}

result := rsp.GetResult()
if result == nil {
return fmt.Errorf("result is empty")
}

return nil
}

func sanitizeDriverName(driver string) string {
// replace '/' with '_'
return strings.Replace(driver, "/", "_", -1)
}

func getNodeID(driver string, node *v1.Node) (*csi.NodeID, error) {
annotationName := nodeIDAnnotation + sanitizeDriverName(driver)
nodeID, ok := node.Annotations[annotationName]
if !ok {
return nil, fmt.Errorf("node %q has no NodeID for driver %q", node.Name, driver)
}
return &csi.NodeID{
Values: map[string]string{
// TODO: find out what key is expected.
nodeNameKey: nodeID,
},
}, nil
}

func getVolumeCapabilities(pv *v1.PersistentVolume) (*csi.VolumeCapability, error) {
m := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range pv.Spec.AccessModes {
m[mode] = true
}

cap := &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
// TODO: get FsType from somewhere
MountFlags: pv.Spec.MountOptions,
},
},
AccessMode: &csi.VolumeCapability_AccessMode{},
}

// Translate array of modes into single VolumeCapability
switch {
case m[v1.ReadWriteMany]:
// ReadWriteMany trumps everything, regardless what other modes are set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER

case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]:
// This is no way how to translate this to CSI...
return nil, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume")

case m[v1.ReadOnlyMany]:
// There is only ReadOnlyMany set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY

case m[v1.ReadWriteOnce]:
// There is only ReadWriteOnce set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER

default:
return nil, fmt.Errorf("unsupported AccessMode combination: %+v", pv.Spec.AccessModes)
}
return cap, nil
}

func (c *csiConnection) Close() error {
return c.conn.Close()
}
Loading